Forwarding this to dev, since this may help others, whoever using in memory
mode of wso2 MB.

---------- Forwarded message ----------
From: Malinga Purnasiri <[email protected]>
Date: Fri, Jun 7, 2013 at 12:25 PM
Subject: Re: Less messages received than sent
To: Srinath Perera <[email protected]>, Shammi Jayasinghe <[email protected]>,
Hasitha Hiranya <[email protected]>, Ishara Premadasa <[email protected]>


had a chat with Srinath, regarding the message ignoring logic in in-mem
mode,

issue identified during the testing was : we just ignore the messages and
next time getting the messages from the queue, we just ignore the last read
pointer (*in persistence mode Srinath is doing it*) and we loose all the
pending messages due to that.

Now i have fixed it https://wso2.org/jira/browse/MB-293



On Wed, Jun 5, 2013 at 11:18 PM, Malinga Purnasiri <[email protected]>wrote:

> Hi all,
>
> Just look at the Following code block, (this from the
> QueueDeliverWorker.java)
>
> if (isInMemoryMode){
>                         CassandraMessageStore messageStore =
> ClusterResourceHolder.getInstance().getCassandraMessageStore();
>                         List<QueueEntry> messagesFromCassansdra =
> messageStore.getNextQueueMessagesToDeliver(queue, messageCountToRead);
>                         System.out.println("Size : " +
> messagesFromCassansdra.size());
>                         System.out.println("messageCountToRead : " +
> messageCountToRead);
>                         for(QueueEntry message: messagesFromCassansdra){
>                             String queueName =
> ((AMQMessage)message.getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString();
>                             QueueDeliveryInfo queueDeliveryInfo =
> getQueueDeliveryInfo(queueName);
>                             if(!queueDeliveryInfo.messageIgnored){
>
> if(queueDeliveryInfo.readButUndeliveredMessages.size() < 5000){
>                                     System.out.println("limit exceeded : "
> + messagesFromCassansdra.size());
>
> queueDeliveryInfo.readButUndeliveredMessages.add(message);
>                                     System.out.println("Read but
> undeliverd : " + queueDeliveryInfo.readButUndeliveredMessages.size());
>                                     totalReadButUndeliveredMessages++;
>
> System.out.println("totalReadButUndeliveredMessages AFTER : " +
> totalReadButUndeliveredMessages);
>                                     lastProcessedId =
> message.getMessage().getMessageNumber();
>                                 *}else{*
> *                                     queueDeliveryInfo.messageIgnored =
> true;*
> *                                }*
>                             }
>                         }
>
>
>
> ********* Just concentrate on the bold else statement,  if the buffer is
> growing more than 5000 elements, this code block just set ignore flag to
> TRUE. but still it loose message from the from each loop
> (for(QueueEntry message: messagesFromCassansdra){)
>
> Am I correct ?
>
> We need some sophisticate logic here, isnt it ? or am i missing some part
> ? Just confirm ... This is why we loosing few message when publish few
> hundred messages.
>
>
> I have solution here, will explain it later ...
>
> --
> Malinga Pathmal,
> Technical Lead, WSO2, Inc. : http://wso2.com/
> Phone : (+94) 715335898
>



-- 
Malinga Pathmal,
Technical Lead, WSO2, Inc. : http://wso2.com/
Phone : (+94) 715335898



-- 
Malinga Pathmal,
Technical Lead, WSO2, Inc. : http://wso2.com/
Phone : (+94) 715335898
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to