This is an automated email from the ASF dual-hosted git repository.

hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dd96bec  ZOOKEEPER-3484: Improve the throughput by optimizing the 
synchronizat…
dd96bec is described below

commit dd96bec98db2b747958f9bcd0ea9f64d43eac17e
Author: Yisong Yue <yisong...@fb.com>
AuthorDate: Mon Sep 9 19:26:38 2019 -0700

    ZOOKEEPER-3484: Improve the throughput by optimizing the synchronizat…
    
    …ion around outstandingChanges
    
    Refactored the `synchronized(outstandingChanges)` block into 
ZooKeeperServer.java, and return early for read requests.
    
    Author: Yisong Yue <yisong...@fb.com>
    
    Reviewers: maoling <maoling199210...@sina.com>, Michael Han 
<h...@apache.org>
    
    Closes #1042 from yisong-yue/ZOOKEEPER-3484
---
 .../zookeeper/server/FinalRequestProcessor.java    | 29 +----------
 .../apache/zookeeper/server/ZooKeeperServer.java   | 57 ++++++++++++++++++----
 2 files changed, 48 insertions(+), 38 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index ec9cdf1..faccc36 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -72,11 +72,9 @@ import org.apache.zookeeper.proto.SetWatches;
 import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
-import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
 import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
 import org.apache.zookeeper.txn.ErrorTxn;
-import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,33 +111,8 @@ public class FinalRequestProcessor implements 
RequestProcessor {
         if (LOG.isTraceEnabled()) {
             ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
         }
-        ProcessTxnResult rc = null;
-        synchronized (zks.outstandingChanges) {
-            // Need to process local session requests
-            rc = zks.processTxn(request);
-
-            // request.hdr is set for write requests, which are the only ones
-            // that add to outstandingChanges.
-            if (request.getHdr() != null) {
-                TxnHeader hdr = request.getHdr();
-                long zxid = hdr.getZxid();
-                while (!zks.outstandingChanges.isEmpty() && 
zks.outstandingChanges.peek().zxid <= zxid) {
-                    ChangeRecord cr = zks.outstandingChanges.remove();
-                    
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
-                    if (cr.zxid < zxid) {
-                        LOG.warn("Zxid outstanding " + cr.zxid + " is less 
than current " + zxid);
-                    }
-                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
-                        zks.outstandingChangesForPath.remove(cr.path);
-                    }
-                }
-            }
 
-            // do not add non quorum packets to the queue.
-            if (request.isQuorum()) {
-                zks.getZKDatabase().addCommittedProposal(request);
-            }
-        }
+        ProcessTxnResult rc = zks.processTxn(request);
 
         // ZOOKEEPER-558:
         // In some cases the server does not close the connection (e.g., 
closeconn buffer
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 95aaed3..c623b87 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1512,18 +1512,54 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
 
     // entry point for quorum/Learner.java
     public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
-        return processTxn(null, hdr, txn);
+        processTxnForSessionEvents(null, hdr, txn);
+        return processTxnInDB(hdr, txn);
     }
 
     // entry point for FinalRequestProcessor.java
     public ProcessTxnResult processTxn(Request request) {
-        return processTxn(request, request.getHdr(), request.getTxn());
+        TxnHeader hdr = request.getHdr();
+        processTxnForSessionEvents(request, hdr, request.getTxn());
+
+        final boolean writeRequest = (hdr != null);
+        final boolean quorumRequest = request.isQuorum();
+
+        // return fast w/o synchronization when we get a read
+        if (!writeRequest && !quorumRequest) {
+            return new ProcessTxnResult();
+        }
+        synchronized (outstandingChanges) {
+            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn());
+
+            // request.hdr is set for write requests, which are the only ones
+            // that add to outstandingChanges.
+            if (writeRequest) {
+                long zxid = hdr.getZxid();
+                while (!outstandingChanges.isEmpty()
+                        && outstandingChanges.peek().zxid <= zxid) {
+                    ChangeRecord cr = outstandingChanges.remove();
+                    
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
+                    if (cr.zxid < zxid) {
+                        LOG.warn("Zxid outstanding " + cr.zxid
+                                + " is less than current " + zxid);
+                    }
+                    if (outstandingChangesForPath.get(cr.path) == cr) {
+                        outstandingChangesForPath.remove(cr.path);
+                    }
+                }
+            }
+
+            // do not add non quorum packets to the queue.
+            if (quorumRequest) {
+                getZKDatabase().addCommittedProposal(request);
+            }
+            return rc;
+        }
     }
 
-    private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record 
txn) {
-        ProcessTxnResult rc;
-        int opCode = request != null ? request.type : hdr.getType();
-        long sessionId = request != null ? request.sessionId : 
hdr.getClientId();
+    private void processTxnForSessionEvents(Request request, TxnHeader hdr, 
Record txn) {
+        int opCode = (request == null) ? hdr.getType() : request.type;
+        long sessionId = (request == null) ? hdr.getClientId() : 
request.sessionId;
 
         if (opCode == OpCode.createSession) {
             if (hdr != null && txn instanceof CreateSessionTxn) {
@@ -1535,13 +1571,14 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
         } else if (opCode == OpCode.closeSession) {
             sessionTracker.removeSession(sessionId);
         }
+    }
 
-        if (hdr != null) {
-            rc = getZKDatabase().processTxn(hdr, txn);
+    private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn) {
+        if (hdr == null) {
+            return new ProcessTxnResult();
         } else {
-            rc = new ProcessTxnResult();
+            return getZKDatabase().processTxn(hdr, txn);
         }
-        return rc;
     }
 
     public Map<Long, Set<Long>> getSessionExpiryMap() {

Reply via email to