Hi Malinga, I think following is the case.
1) Outer while loop pick a message, and inner for loop finds a subscription 2) now if all subscriptions are full for message 1, we will skip that 3) but loop will continue and try the message 2 as well 4) by the time we try messages 2, we might have received a ack and then subscription can be free, but then we send message 2 5) later we will send message 1 as well I think if all subscriptions are full for some message. we have to abort the while loop and later retry from beginning. --Srinath On Fri, May 10, 2013 at 1:24 PM, Malinga Purnasiri <[email protected]>wrote: > > > Hi all, > > I think I have find the location it breaks the order. its in the > QueueDeliveryWorker.class inside following method, > > *Lemme explain, sout in the LABLE A, its in proper order, but sout in the > LABEL B is not. This may be due to else condition in the LABEL C. any idea ? > * > > public int sendMessagesToSubscriptions(String targetQueue, > List<QueueEntry> messages){ > int sentMessageCount = 0; > Iterator<QueueEntry> iterator = messages.iterator(); > while (iterator.hasNext()) { > QueueEntry message = iterator.next(); > *System.out.println("Itarator.next() : " + > message.getMessageHeader().getHeader("COUNT")); > ---------------------------------- LABLE A* > boolean messageSent = false; > Map<String, CassandraSubscription> subscriptions4Queue = > subscriptionMap.get(targetQueue); > if(subscriptions4Queue != null){ > /* > * we do this in a for loop to avoid iterating for a > subscriptions for ever. We only iterate as > * once for each subscription > */ > for(int j =0;j< subscriptions4Queue.size();j++){ > CassandraSubscription cassandraSubscription = > findNextSubscriptionToSent(targetQueue); > if(isThisSubscriptionHasRoom(cassandraSubscription)){ > AMQProtocolSession session = > cassandraSubscription.getSession(); > > ((AMQMessage) > message.getMessage()).setClientIdentifier(session); > > if(log.isDebugEnabled()){ > log.debug("readFromCassandra"+ > AndesUtils.printAMQMessage(message)); > } > *System.out.println("to deliverAsynchronously : " > + message.getMessageHeader().getHeader("COUNT")); > **---------------------------------- > LABLE B* > > deliverAsynchronously(cassandraSubscription.getSubscription(), message); > totMsgSent++; > sentMessageCount++; > totalReadButUndeliveredMessages--; > messageSent = true; > iterator.remove(); > break; > } > } > if(!messageSent){ > log.debug("All subscriptions for queue "+ targetQueue > + " have max Unacked messages "+ queue.getName()); > } > }else{ > *//All subscriptions deleted for the queue, should we > move messages back to global queue? -------------- LABEL C* > } > } > return sentMessageCount; > } > > > > > On Fri, May 10, 2013 at 9:45 AM, Srinath Perera <[email protected]> wrote: > >> Hi Malinga, >> >> CCing architecture@. Should discuss these in public. >> >> This code is there to avoid client getting too many messages that he did >> not consume. Otherwise, messages will fill at the client side and it will >> go out of memory. >> >> We have to make 5000 configurable. >> >> Also, one slow client must not slow down delivery to others. I think I >> handled that when I wrote the code. But if we have that problem still we >> have to fix. Please feel free to come and bug me. I can help understand the >> code. >> >> --Srinath >> >> >> >> >> >> On Thu, May 9, 2013 at 10:37 PM, Hasitha Hiranya <[email protected]>wrote: >> >>> Hi, >>> >>> Please note my comments inline. >>> >>> >>> On Thu, May 9, 2013 at 10:14 PM, Malinga Purnasiri <[email protected]>wrote: >>> >>>> Hi All, >>>> >>>> I had some concerns when I'm trying to fix the sequence *out of >>>> order*issue of the MB. Since the internal thread model complexity I >>>> have divided the problem into 3 distinct small parts for easy analysis. >>>> >>>> *Part 1 : Messages to GQ (Global Q) * >>>> >>>> *Problem identified* : We had Parallel thread pool executor of 50 >>>> initial threads in the pool to do this task. As we had meeting with Dr >>>> Srinath, we identified that this thread model has some serious issues, >>>> which will lead to out of order. So I have changed this into >>>> SingleThreadedExecutor and that problem solved. >>>> (I will write a *test case* for that during day tomorrow and create a >>>> patch for that, and conclude that). >>>> >>>> *Before the fix* : I have publish 10,000 messages to the Q, and found >>>> that all the messages comes out of order to GQ. >>>> *After the fix *: It solved >>>> >>>> *Part 2 : Messages from GQ to NQ* >>>> >>>> This had no any issue whatsoever. (in regards to out of order issue). >>>> >>>> *Part 3 : Messages from NQ to Subscribers* >>>> >>>> Tho, I have fixed the out of order issue in the part 1, I have realize >>>> that when it deliver to the end client (subscriber) it still out of order. >>>> So I have put some debugs to QueueDiliveryWorker.java inside >>>> deliverAsynchronously() method. >>>> >>>> System.out.println("NQ to SUB : " + >>>> message.getMessageHeader().getHeader("COUNT")); >>>> >>>> *This failed to publish messages in sequence to the end client.* >>>> >>>> *Conclusion : * >>>> >>>> The class in the QueueDiliveryWorker is bit complex and I'm still >>>> reading it. Hope to fix this issue within day tomorrow. If u guys have any >>>> advice on this just tell me. >>>> >>>> Great work! I'd like to help understanding the QueueDiliveryWorker >>> code. We need to identify the point where it breaks the order. >>> >>>> >>>> *Additional Issue : * >>>> * >>>> * >>>> *And also we have encountered, when we publish bulk of messages at the >>>> sending end; at the receving end, it keep on pausing while retriving the >>>> messages. That is due to following code line. in the >>>> QueueDiliverWorkier.java* >>>> >>>> if(*!queueDeliveryInfo.messageIgnored*){ >>>> // TODO : in here we have some issue >>>> .... >>>> >>>> if(queueDeliveryInfo.readButUndeliveredMessages.size() < *5000*){ >>>> >>>> queueDeliveryInfo.readButUndeliveredMessages.add(message); >>>> totalReadButUndeliveredMessages++; >>>> lastProcessedId = >>>> message.getMessage().getMessageNumber(); >>>> }else{ >>>> *queueDeliveryInfo.messageIgnored >>>> = true;* >>>> } >>>> >>>> If the messages count (undelivered) higher than *5000* (hard limit) it >>>> set the flag ignore messages to *TRUE*. if we increase this number (*5000 >>>> to 10,000*) we get messages WITHOUT ANY PAUSES. >>>> >>>> *My suggestion is to set this limit on demand (dynamically with out >>>> any hard limits)* >>>> >>>> *any clue on this also.* >>>> >>> >>> IMO, this was done to stop messaging being filled up in memory. We >>> cannot overload the client, as client may go OOM. If the client stops >>> message receiving, that means it has enough room. Then why the messages >>> were not prefetched? we need to study the code and comprehend the story. >>> >>>> * >>>> * >>>> - Malinga >>>> >>> >>> >>> Thanks. >>> -- >>> *Hasitha Abeykoon* >>> Software Engineer; WSO2, Inc.; http://wso2.com >>> *cell:* *+94 719363063* >>> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>* * >>> * >>> * >>> >> >> >> >> -- >> ============================ >> Srinath Perera, Ph.D. >> Senior Software Architect, WSO2 Inc. >> Visiting Faculty, University of Moratuwa >> Member, Apache Software Foundation >> Research Scientist, Lanka Software Foundation >> Blog: http://srinathsview.blogspot.com/ >> Photos: http://www.flickr.com/photos/hemapani/ >> Phone: 0772360902 >> > > > -- ============================ Srinath Perera, Ph.D. Senior Software Architect, WSO2 Inc. Visiting Faculty, University of Moratuwa Member, Apache Software Foundation Research Scientist, Lanka Software Foundation Blog: http://srinathsview.blogspot.com/ Photos: http://www.flickr.com/photos/hemapani/ Phone: 0772360902
_______________________________________________ Architecture mailing list [email protected] https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
