This is an automated email from the ASF dual-hosted git repository. hanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push: new dd96bec ZOOKEEPER-3484: Improve the throughput by optimizing the synchronizat… dd96bec is described below commit dd96bec98db2b747958f9bcd0ea9f64d43eac17e Author: Yisong Yue <yisong...@fb.com> AuthorDate: Mon Sep 9 19:26:38 2019 -0700 ZOOKEEPER-3484: Improve the throughput by optimizing the synchronizat… …ion around outstandingChanges Refactored the `synchronized(outstandingChanges)` block into ZooKeeperServer.java, and return early for read requests. Author: Yisong Yue <yisong...@fb.com> Reviewers: maoling <maoling199210...@sina.com>, Michael Han <h...@apache.org> Closes #1042 from yisong-yue/ZOOKEEPER-3484 --- .../zookeeper/server/FinalRequestProcessor.java | 29 +---------- .../apache/zookeeper/server/ZooKeeperServer.java | 57 ++++++++++++++++++---- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index ec9cdf1..faccc36 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -72,11 +72,9 @@ import org.apache.zookeeper.proto.SetWatches; import org.apache.zookeeper.proto.SyncRequest; import org.apache.zookeeper.proto.SyncResponse; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; -import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord; import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; import org.apache.zookeeper.server.util.RequestPathMetricsCollector; import org.apache.zookeeper.txn.ErrorTxn; -import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,33 +111,8 @@ public class FinalRequestProcessor implements RequestProcessor { if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); } - ProcessTxnResult rc = null; - synchronized (zks.outstandingChanges) { - // Need to process local session requests - rc = zks.processTxn(request); - - // request.hdr is set for write requests, which are the only ones - // that add to outstandingChanges. - if (request.getHdr() != null) { - TxnHeader hdr = request.getHdr(); - long zxid = hdr.getZxid(); - while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) { - ChangeRecord cr = zks.outstandingChanges.remove(); - ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); - if (cr.zxid < zxid) { - LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid); - } - if (zks.outstandingChangesForPath.get(cr.path) == cr) { - zks.outstandingChangesForPath.remove(cr.path); - } - } - } - // do not add non quorum packets to the queue. - if (request.isQuorum()) { - zks.getZKDatabase().addCommittedProposal(request); - } - } + ProcessTxnResult rc = zks.processTxn(request); // ZOOKEEPER-558: // In some cases the server does not close the connection (e.g., closeconn buffer diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 95aaed3..c623b87 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1512,18 +1512,54 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // entry point for quorum/Learner.java public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { - return processTxn(null, hdr, txn); + processTxnForSessionEvents(null, hdr, txn); + return processTxnInDB(hdr, txn); } // entry point for FinalRequestProcessor.java public ProcessTxnResult processTxn(Request request) { - return processTxn(request, request.getHdr(), request.getTxn()); + TxnHeader hdr = request.getHdr(); + processTxnForSessionEvents(request, hdr, request.getTxn()); + + final boolean writeRequest = (hdr != null); + final boolean quorumRequest = request.isQuorum(); + + // return fast w/o synchronization when we get a read + if (!writeRequest && !quorumRequest) { + return new ProcessTxnResult(); + } + synchronized (outstandingChanges) { + ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn()); + + // request.hdr is set for write requests, which are the only ones + // that add to outstandingChanges. + if (writeRequest) { + long zxid = hdr.getZxid(); + while (!outstandingChanges.isEmpty() + && outstandingChanges.peek().zxid <= zxid) { + ChangeRecord cr = outstandingChanges.remove(); + ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); + if (cr.zxid < zxid) { + LOG.warn("Zxid outstanding " + cr.zxid + + " is less than current " + zxid); + } + if (outstandingChangesForPath.get(cr.path) == cr) { + outstandingChangesForPath.remove(cr.path); + } + } + } + + // do not add non quorum packets to the queue. + if (quorumRequest) { + getZKDatabase().addCommittedProposal(request); + } + return rc; + } } - private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) { - ProcessTxnResult rc; - int opCode = request != null ? request.type : hdr.getType(); - long sessionId = request != null ? request.sessionId : hdr.getClientId(); + private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { + int opCode = (request == null) ? hdr.getType() : request.type; + long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { @@ -1535,13 +1571,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } + } - if (hdr != null) { - rc = getZKDatabase().processTxn(hdr, txn); + private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn) { + if (hdr == null) { + return new ProcessTxnResult(); } else { - rc = new ProcessTxnResult(); + return getZKDatabase().processTxn(hdr, txn); } - return rc; } public Map<Long, Set<Long>> getSessionExpiryMap() {