[ 
https://issues.apache.org/jira/browse/ZOOKEEPER-2807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279351#comment-16279351
 ] 

ASF GitHub Bot commented on ZOOKEEPER-2807:
-------------------------------------------

Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/300#discussion_r155103949
  
    --- Diff: 
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java ---
    @@ -240,84 +240,14 @@ public void run() {
                         }
     
                         // Process committed head
    -                    if ((request = committedRequests.poll()) == null) {
    -                        throw new IOException("Error: committed head is 
null");
    -                    }
    -
    -                    /*
    -                     * Check if request is pending, if so, update it with 
the committed info
    -                     */
    -                    LinkedList<Request> sessionQueue = pendingRequests
    -                            .get(request.sessionId);
    -                    if (sessionQueue != null) {
    -                        // If session queue != null, then it is also not 
empty.
    -                        Request topPending = sessionQueue.poll();
    -                        if (request.cxid != topPending.cxid) {
    -                            /*
    -                             * TL;DR - we should not encounter this 
scenario often under normal load.
    -                             * We pass the commit to the next processor 
and put the pending back with a warning.
    -                             *
    -                             * Generally, we can get commit requests that 
are not at the queue head after
    -                             * a session moved (see ZOOKEEPER-2684). Let's 
denote the previous server of the session
    -                             * with A, and the server that the session 
moved to with B (keep in mind that it is
    -                             * possible that the session already moved 
from B to a new server C, and maybe C=A).
    -                             * 1. If request.cxid < topPending.cxid : this 
means that the session requested this update
    -                             * from A, then moved to B (i.e., which is 
us), and now B receives the commit
    -                             * for the update after the session already 
performed several operations in B
    -                             * (and therefore its cxid is higher than that 
old request).
    -                             * 2. If request.cxid > topPending.cxid : this 
means that the session requested an updated
    -                             * from B with cxid that is bigger than the 
one we know therefore in this case we
    -                             * are A, and we lost the connection to the 
session. Given that we are waiting for a commit
    -                             * for that update, it means that we already 
sent the request to the leader and it will
    -                             * be committed at some point (in this case 
the order of cxid won't follow zxid, since zxid
    -                             * is an increasing order). It is not safe for 
us to delete the session's queue at this
    -                             * point, since it is possible that the 
session has newer requests in it after it moved
    -                             * back to us. We just leave the queue as it 
is, and once the commit arrives (for the old
    -                             * request), the finalRequestProcessor will 
see a closed cnxn handle, and just won't send a
    -                             * response.
    -                             * Also note that we don't have a local 
session, therefore we treat the request
    -                             * like any other commit for a remote request, 
i.e., we perform the update without sending
    -                             * a response.
    -                             */
    -                            LOG.warn("Got request " + request +
    -                                    " but we are expecting request " + 
topPending);
    -                            sessionQueue.addFirst(topPending);
    -                        } else {
    -                            /*
    -                             * Generally, we want to send to the next 
processor our version of the request,
    -                             * since it contains the session information 
that is needed for post update processing.
    -                             * In more details, when a request is in the 
local queue, there is (or could be) a client
    -                             * attached to this server waiting for a 
response, and there is other bookkeeping of
    -                             * requests that are outstanding and have 
originated from this server
    -                             * (e.g., for setting the max outstanding 
requests) - we need to update this info when an
    -                             * outstanding request completes. Note that in 
the other case (above), the operation
    -                             * originated from a different server and 
there is no local bookkeeping or a local client
    -                             * session that needs to be notified.
    -                             */
    -                            topPending.setHdr(request.getHdr());
    -                            topPending.setTxn(request.getTxn());
    -                            topPending.zxid = request.zxid;
    -                            request = topPending;
    -                        }
    -                    }
    -
    -                    sendToNextProcessor(request);
    -
    -                    waitForEmptyPool();
    -
    -                    /*
    -                     * Process following reads if any, remove session 
queue if
    -                     * empty.
    -                     */
    -                    if (sessionQueue != null) {
    -                        while (!stopped && !sessionQueue.isEmpty()
    -                                && !needCommit(sessionQueue.peek())) {
    -                            sendToNextProcessor(sessionQueue.poll());
    -                        }
    -                        // Remove empty queues
    -                        if (sessionQueue.isEmpty()) {
    -                            pendingRequests.remove(request.sessionId);
    +                    // We only need to perform synchronization if we are 
on the last request in the queue
    +                    if (committedRequests.size() == 1) {
    +                        synchronized (committedRequests) {
    --- End diff --
    
    Ok



> Flaky test: 
> org.apache.zookeeper.test.WatchEventWhenAutoResetTest.testNodeDataChanged
> -------------------------------------------------------------------------------------
>
>                 Key: ZOOKEEPER-2807
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-2807
>             Project: ZooKeeper
>          Issue Type: Bug
>            Reporter: Abraham Fine
>            Assignee: Abraham Fine
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to