[ 
https://issues.apache.org/jira/browse/QPID-3079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13047165#comment-13047165
 ] 

jirapos...@reviews.apache.org commented on QPID-3079:
-----------------------------------------------------



bq.  On 2011-06-10 00:22:59, Kenneth Giusti wrote:
bq.  > /branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp, line 685
bq.  > <https://reviews.apache.org/r/860/diff/1/?file=20822#file20822line685>
bq.  >
bq.  >     The P.M. _could_ be being dequeued simultaneously from multiple 
queues.  Won't that just move the map from the queue to the P.M.?

I'm suggesting "pmsg->setCompletionCallback(callback)". No map. 
setCompletionCallback() may  be called concurrently in multiple deques but that 
does't require a map, just a lock. I think the completion context belongs on 
the message.


bq.  On 2011-06-10 00:22:59, Kenneth Giusti wrote:
bq.  > /branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp, line 1229
bq.  > <https://reviews.apache.org/r/860/diff/1/?file=20822#file20822line1229>
bq.  >
bq.  >     Perhaps - I still think we'd need to track multiple outstanding 
callbacks for a single message instance.

Why? All  you need to know is when they all complete. A simple atomic counter 
will do the job. I think we should be aiming to avoid multiple heap allocations 
per message wherever we can. Ideally I'd like to see the completion context 
boiled down to a few counters and allocated in-line as part of the Message. The 
interfaces are good, but the implementation does a lot of dynamic allocation. I 
suspect that will hurt performance.


bq.  On 2011-06-10 00:22:59, Kenneth Giusti wrote:
bq.  > /branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp, line 799
bq.  > <https://reviews.apache.org/r/860/diff/1/?file=20825#file20825line799>
bq.  >
bq.  >     Actually, it's very likely that all the messages that are in a 
Message.accept sequence are on the same queue.  I noticed that during debug - a 
flush of a single message.accept command resulted in multiple flush calls to 
the same queue.  According to Kim, multiple flushes to the same queue are 
ignored, so I could drop this, but that may not be true for store in general.

Why do we need a flush() on the AsyncMessageAcceptCommand?


bq.  On 2011-06-10 00:22:59, Kenneth Giusti wrote:
bq.  > /branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp, line 862
bq.  > <https://reviews.apache.org/r/860/diff/1/?file=20825#file20825line862>
bq.  >
bq.  >     There may be multiple pending dequeues for a given message, so we 
would need some sort of container anyway.

I'd prefer a counter to a container.


bq.  On 2011-06-10 00:22:59, Kenneth Giusti wrote:
bq.  > /branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp, line 876
bq.  > <https://reviews.apache.org/r/860/diff/1/?file=20825#file20825line876>
bq.  >
bq.  >     I'm not really happy with this approach, but I can't think of a way 
to track N outstanding dequeue operations without using some dynamic container.
bq.  >     
bq.  >     Each callback is actually a "functor" object that contains state 
needed to map the dequeue back to the pending Message.Accept.  This state impl 
is hidden from the Message object - I'd hate to have to expose command-specific 
state into the messages (but I'm probably not thinking abstractly enough at 
this point :P )

As long as the command-specific state is encapsulated in its own class, I think 
it's fine to attach it to the message. In fact I think we need a general 
solution for "feature x has per-message state"  that avoids keeping a map of 
all the messages. Not for this patch though :)


bq.  On 2011-06-10 00:22:59, Kenneth Giusti wrote:
bq.  > /branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp, line 930
bq.  > <https://reviews.apache.org/r/860/diff/1/?file=20825#file20825line930>
bq.  >
bq.  >     The behaviour of the unacked list should be the same as it is now.  
I'm more worried about the completion of the Message.Accept cmd, which will 
happen after the store has completed dequeue.  That will be a visible change in 
behaviour and I don't think clustering will like it.

Completion of the accept is sent in a control, not a command. So it doesn't 
disturb the command-ids of outgoing transfers. Need to review  if there's any 
potential for trouble with the rest of the Session state. Gordon?


- Alan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/860/#review797
-----------------------------------------------------------


On 2011-06-08 20:31:05, Kenneth Giusti wrote:
bq.  
bq.  -----------------------------------------------------------
bq.  This is an automatically generated e-mail. To reply, visit:
bq.  https://reviews.apache.org/r/860/
bq.  -----------------------------------------------------------
bq.  
bq.  (Updated 2011-06-08 20:31:05)
bq.  
bq.  
bq.  Review request for qpid, Alan Conway, Gordon Sim, and Kim van der Riet.
bq.  
bq.  
bq.  Summary
bq.  -------
bq.  
bq.  Modifies the broker's handling of Message.Accept to hold off the 
completion of the command until all messages related to the accept have 
completed dequeue.  This particularly applies to persistent messages, as the 
store::dequeue() operation must complete before the message is considered fully 
dequeued.
bq.  
bq.  Note this bugfix requires some changes to the broker's store module 
interface:  previously, the store only identified the message when a dequeue 
was completed.  This is not enough information - the queue from which is was 
removed must also be identified (the message may be in the process of being 
dequeued on several queues at once).
bq.  
bq.  
bq.  This addresses bug qpid-3079.
bq.      https://issues.apache.org/jira/browse/qpid-3079
bq.  
bq.  
bq.  Diffs
bq.  -----
bq.  
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/AsyncCompletion.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/Message.cpp 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStore.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.h 
1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp 
1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/NullMessageStore.cpp 
1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.h 
1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableMessage.cpp 
1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/PersistableQueue.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoverableQueue.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 
1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionContext.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.h 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/qpid/broker/SessionState.cpp 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/tests/AsyncCompletion.cpp 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/tests/QueueTest.cpp 1124895 
bq.    /branches/qpid-3079/qpid/cpp/src/tests/TestMessageStore.h 1124895 
bq.  
bq.  Diff: https://reviews.apache.org/r/860/diff
bq.  
bq.  
bq.  Testing
bq.  -------
bq.  
bq.  broker unit tests, store unit tests (modified jboss store).   Still needs 
to be vetted on non-linux, and have latest trunk merged in.
bq.  
bq.  
bq.  Thanks,
bq.  
bq.  Kenneth
bq.  
bq.



> message.accept command should be completed on a per-dequeue basis
> -----------------------------------------------------------------
>
>                 Key: QPID-3079
>                 URL: https://issues.apache.org/jira/browse/QPID-3079
>             Project: Qpid
>          Issue Type: Bug
>          Components: C++ Broker
>    Affects Versions: 0.8, 0.9
>            Reporter: Ken Giusti
>            Assignee: Ken Giusti
>             Fix For: 0.11
>
>         Attachments: proposal.txt
>
>
> ** Overview
> Asynchronous completion means that command execution is initiated in one 
> thread
> (a client connection thread) and completed in a different thread.
> When the async store is loaded, message.transfer commands are
> completed by a store thread that does the async write.
> ** Issues with asynchronous completion code as of revision r1029686
> *** Not really asynchronous
> IncompleteMessageList::process blocks the connection thread till all
> outstanding async commands (messages) for the session are complete.
> With the new cluster, this could deadlock since it is blocking a Poller 
> thread.
> *** Race condition for message.transfer
>     
> Sketch of the current code:
> // Called in connection thread 
> PersistableMessage::enqueueAsync { ++counter; } 
> // Called in store thread once message is written.
> PersistableMessage::enqueueComplete { if (--counter == 0) notifyCompleted(); }
> The intent is that notify be called once per message, after the
> message has been written to each queue it was routed to.
> However of a message is routed to N queues, it's possible for
> notifyCompleted to be called up to N times. The store thread could
> call notifyCompleted for the first queue before the connection thread
> has called enqueueAsync for the second queue, and so on.
> *** No asynchronous completion for message.accept
> We do not currently delay completion of message.accept until the
> message is deleted from the async store. This could cause duplicate
> delivery if the broker crashes after completing the message but 
> before it is removed from store.
> There is code in PersistableMessage to maintain a counter for dequeues
> analogous to to the async enqueue code but this is incorrect. 
> Completion of transfer is triggered when all enqueues for a message are 
> complete.
> Completion of accept is triggered for *each* dequeue from a queue 
> independently.
> Furthermore a single accept can reference many messages, so it can't be 
> associated with a message.
> ** New requirements
> The new cluster design will need to participate in async completion, e.g.
> an accept cannot be comlpeted until the message is 
> - removed from store (if present) AND
> - replicated to the cluster (if present) as dequeued
> The new cluster also needs to asynchronously complete binding commands
> (declare, bind, delete) when they are replicated to the cluster.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org

Reply via email to