Hi Malinga,

I think following is the case.

1) Outer while loop pick a message, and inner for loop finds a subscription

2) now if all subscriptions are full for message 1, we will skip that

3) but loop will continue and try the message 2 as well

4) by the time we try messages 2, we might have received a ack and then
subscription can be free, but then we send message 2

5) later we will send message 1 as well

I think if all subscriptions are full for some message. we have
to abort the while loop and later retry from beginning.

--Srinath


On Fri, May 10, 2013 at 1:24 PM, Malinga Purnasiri <[email protected]>wrote:

>
>
> Hi all,
>
> I think I have find the location it breaks the order. its in the
> QueueDeliveryWorker.class inside following method,
>
> *Lemme explain, sout in the LABLE A, its in proper order, but sout in the
> LABEL B is not. This may be due to else condition in the LABEL C. any idea ?
> *
>
> public int sendMessagesToSubscriptions(String targetQueue,
> List<QueueEntry> messages){
>         int sentMessageCount = 0;
>         Iterator<QueueEntry> iterator = messages.iterator();
>         while (iterator.hasNext()) {
>             QueueEntry message = iterator.next();
>             *System.out.println("Itarator.next() : " +
> message.getMessageHeader().getHeader("COUNT"));
> ---------------------------------- LABLE A*
>             boolean messageSent = false;
>             Map<String, CassandraSubscription> subscriptions4Queue =
> subscriptionMap.get(targetQueue);
>             if(subscriptions4Queue != null){
>                 /*
>                  * we do this in a for loop to avoid iterating for a
> subscriptions for ever. We only iterate as
>                  * once for each subscription
>                  */
>                 for(int j =0;j< subscriptions4Queue.size();j++){
>                     CassandraSubscription cassandraSubscription =
> findNextSubscriptionToSent(targetQueue);
>                     if(isThisSubscriptionHasRoom(cassandraSubscription)){
>                         AMQProtocolSession session =
> cassandraSubscription.getSession();
>
>                         ((AMQMessage)
> message.getMessage()).setClientIdentifier(session);
>
>                         if(log.isDebugEnabled()){
>                             log.debug("readFromCassandra"+
> AndesUtils.printAMQMessage(message));
>                         }
>                         *System.out.println("to deliverAsynchronously : "
> + message.getMessageHeader().getHeader("COUNT")); 
> **----------------------------------
> LABLE B*
>
> deliverAsynchronously(cassandraSubscription.getSubscription(), message);
>                         totMsgSent++;
>                         sentMessageCount++;
>                         totalReadButUndeliveredMessages--;
>                         messageSent = true;
>                         iterator.remove();
>                         break;
>                     }
>                 }
>                 if(!messageSent){
>                     log.debug("All subscriptions for queue "+ targetQueue
> + " have max Unacked messages "+ queue.getName());
>                 }
>             }else{
>                 *//All subscriptions deleted for the queue, should we
> move messages back to global queue? -------------- LABEL C*
>             }
>         }
>         return sentMessageCount;
>     }
>
>
>
>
> On Fri, May 10, 2013 at 9:45 AM, Srinath Perera <[email protected]> wrote:
>
>> Hi Malinga,
>>
>> CCing architecture@. Should discuss these in public.
>>
>> This code is there to avoid client getting too many messages that he did
>> not consume. Otherwise, messages will fill at the client side and it will
>> go out of memory.
>>
>> We have to make 5000 configurable.
>>
>> Also, one slow client must not slow down delivery to others. I think I
>> handled that when I wrote the code. But if we have that problem still we
>> have to fix. Please feel free to come and bug me. I can help understand the
>> code.
>>
>> --Srinath
>>
>>
>>
>>
>>
>> On Thu, May 9, 2013 at 10:37 PM, Hasitha Hiranya <[email protected]>wrote:
>>
>>> Hi,
>>>
>>> Please note my  comments inline.
>>>
>>>
>>> On Thu, May 9, 2013 at 10:14 PM, Malinga Purnasiri <[email protected]>wrote:
>>>
>>>> Hi All,
>>>>
>>>> I had some concerns when I'm trying to fix the sequence *out of 
>>>> order*issue of the MB. Since the internal thread model complexity I
>>>> have divided the problem into 3 distinct small parts for easy analysis.
>>>>
>>>> *Part 1 : Messages to GQ (Global Q) *
>>>>
>>>> *Problem identified* : We had Parallel thread pool executor of 50
>>>> initial threads in the pool to do this task. As we had meeting with Dr
>>>> Srinath, we identified that this thread model has some serious issues,
>>>> which will lead to out of order. So I have changed this into
>>>> SingleThreadedExecutor and that problem solved.
>>>> (I will write a *test case* for that during day tomorrow and create a
>>>> patch for that, and conclude that).
>>>>
>>>> *Before the fix* : I have publish 10,000 messages to the Q, and found
>>>> that all the messages comes out of order to GQ.
>>>> *After the fix *: It solved
>>>>
>>>> *Part 2 : Messages from GQ to NQ*
>>>>
>>>> This had no any issue whatsoever. (in regards to out of order issue).
>>>>
>>>> *Part 3 : Messages from NQ to Subscribers*
>>>>
>>>> Tho, I have fixed the out of order issue in the part 1, I have realize
>>>> that when it deliver to the end client (subscriber) it still out of order.
>>>> So I have put some debugs to QueueDiliveryWorker.java inside
>>>> deliverAsynchronously() method.
>>>>
>>>> System.out.println("NQ to SUB : " +
>>>> message.getMessageHeader().getHeader("COUNT"));
>>>>
>>>> *This failed to publish messages in sequence to the end client.*
>>>>
>>>> *Conclusion : *
>>>>
>>>> The class in the QueueDiliveryWorker is bit complex and I'm still
>>>> reading it. Hope to fix this issue within day tomorrow. If u guys have any
>>>> advice on this just tell me.
>>>>
>>>>    Great work! I'd like to help understanding  the  QueueDiliveryWorker
>>> code. We need to identify the point where it breaks the order.
>>>
>>>>
>>>> *Additional Issue : *
>>>> *
>>>> *
>>>> *And also we have encountered, when we publish bulk of messages at the
>>>> sending end; at the receving end, it keep on pausing while retriving the
>>>> messages. That is due to following code line. in the
>>>> QueueDiliverWorkier.java*
>>>>
>>>> if(*!queueDeliveryInfo.messageIgnored*){
>>>>                                 // TODO : in here we have some issue
>>>> ....
>>>>
>>>> if(queueDeliveryInfo.readButUndeliveredMessages.size() < *5000*){
>>>>
>>>> queueDeliveryInfo.readButUndeliveredMessages.add(message);
>>>>                                     totalReadButUndeliveredMessages++;
>>>>                                     lastProcessedId =
>>>> message.getMessage().getMessageNumber();
>>>>                                 }else{
>>>>                                     *queueDeliveryInfo.messageIgnored
>>>> = true;*
>>>>                                 }
>>>>
>>>> If the messages count (undelivered) higher than *5000* (hard limit) it
>>>> set the flag ignore messages to *TRUE*. if we increase this number (*5000
>>>> to 10,000*) we get messages WITHOUT ANY PAUSES.
>>>>
>>>> *My suggestion is to set this limit on demand (dynamically with out
>>>> any hard limits)*
>>>>
>>>> *any clue on this also.*
>>>>
>>>
>>>    IMO, this was done to stop messaging being filled up in memory. We
>>> cannot overload the client, as client may go OOM. If the client stops
>>> message receiving, that means it has enough room. Then why the messages
>>> were not prefetched? we need to study the code and comprehend the story.
>>>
>>>> *
>>>> *
>>>> - Malinga
>>>>
>>>
>>>
>>> Thanks.
>>> --
>>> *Hasitha Abeykoon*
>>> Software Engineer; WSO2, Inc.; http://wso2.com
>>> *cell:* *+94 719363063*
>>> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>* *
>>> *
>>> *
>>>
>>
>>
>>
>> --
>> ============================
>> Srinath Perera, Ph.D.
>>   Senior Software Architect, WSO2 Inc.
>>   Visiting Faculty, University of Moratuwa
>>   Member, Apache Software Foundation
>>   Research Scientist, Lanka Software Foundation
>>   Blog: http://srinathsview.blogspot.com/
>>   Photos: http://www.flickr.com/photos/hemapani/
>>  Phone: 0772360902
>>
>
>
>


-- 
============================
Srinath Perera, Ph.D.
  Senior Software Architect, WSO2 Inc.
  Visiting Faculty, University of Moratuwa
  Member, Apache Software Foundation
  Research Scientist, Lanka Software Foundation
  Blog: http://srinathsview.blogspot.com/
  Photos: http://www.flickr.com/photos/hemapani/
 Phone: 0772360902
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to