Forwarding this to dev, since this may help others, whoever using in memory mode of wso2 MB.
---------- Forwarded message ---------- From: Malinga Purnasiri <[email protected]> Date: Fri, Jun 7, 2013 at 12:25 PM Subject: Re: Less messages received than sent To: Srinath Perera <[email protected]>, Shammi Jayasinghe <[email protected]>, Hasitha Hiranya <[email protected]>, Ishara Premadasa <[email protected]> had a chat with Srinath, regarding the message ignoring logic in in-mem mode, issue identified during the testing was : we just ignore the messages and next time getting the messages from the queue, we just ignore the last read pointer (*in persistence mode Srinath is doing it*) and we loose all the pending messages due to that. Now i have fixed it https://wso2.org/jira/browse/MB-293 On Wed, Jun 5, 2013 at 11:18 PM, Malinga Purnasiri <[email protected]>wrote: > Hi all, > > Just look at the Following code block, (this from the > QueueDeliverWorker.java) > > if (isInMemoryMode){ > CassandraMessageStore messageStore = > ClusterResourceHolder.getInstance().getCassandraMessageStore(); > List<QueueEntry> messagesFromCassansdra = > messageStore.getNextQueueMessagesToDeliver(queue, messageCountToRead); > System.out.println("Size : " + > messagesFromCassansdra.size()); > System.out.println("messageCountToRead : " + > messageCountToRead); > for(QueueEntry message: messagesFromCassansdra){ > String queueName = > ((AMQMessage)message.getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString(); > QueueDeliveryInfo queueDeliveryInfo = > getQueueDeliveryInfo(queueName); > if(!queueDeliveryInfo.messageIgnored){ > > if(queueDeliveryInfo.readButUndeliveredMessages.size() < 5000){ > System.out.println("limit exceeded : " > + messagesFromCassansdra.size()); > > queueDeliveryInfo.readButUndeliveredMessages.add(message); > System.out.println("Read but > undeliverd : " + queueDeliveryInfo.readButUndeliveredMessages.size()); > totalReadButUndeliveredMessages++; > > System.out.println("totalReadButUndeliveredMessages AFTER : " + > totalReadButUndeliveredMessages); > lastProcessedId = > message.getMessage().getMessageNumber(); > *}else{* > * queueDeliveryInfo.messageIgnored = > true;* > * }* > } > } > > > > ********* Just concentrate on the bold else statement, if the buffer is > growing more than 5000 elements, this code block just set ignore flag to > TRUE. but still it loose message from the from each loop > (for(QueueEntry message: messagesFromCassansdra){) > > Am I correct ? > > We need some sophisticate logic here, isnt it ? or am i missing some part > ? Just confirm ... This is why we loosing few message when publish few > hundred messages. > > > I have solution here, will explain it later ... > > -- > Malinga Pathmal, > Technical Lead, WSO2, Inc. : http://wso2.com/ > Phone : (+94) 715335898 > -- Malinga Pathmal, Technical Lead, WSO2, Inc. : http://wso2.com/ Phone : (+94) 715335898 -- Malinga Pathmal, Technical Lead, WSO2, Inc. : http://wso2.com/ Phone : (+94) 715335898
_______________________________________________ Dev mailing list [email protected] http://wso2.org/cgi-bin/mailman/listinfo/dev
