This is an automated email from the ASF dual-hosted git repository.
hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new dd96bec ZOOKEEPER-3484: Improve the throughput by optimizing the
synchronizat…
dd96bec is described below
commit dd96bec98db2b747958f9bcd0ea9f64d43eac17e
Author: Yisong Yue <[email protected]>
AuthorDate: Mon Sep 9 19:26:38 2019 -0700
ZOOKEEPER-3484: Improve the throughput by optimizing the synchronizat…
…ion around outstandingChanges
Refactored the `synchronized(outstandingChanges)` block into
ZooKeeperServer.java, and return early for read requests.
Author: Yisong Yue <[email protected]>
Reviewers: maoling <[email protected]>, Michael Han
<[email protected]>
Closes #1042 from yisong-yue/ZOOKEEPER-3484
---
.../zookeeper/server/FinalRequestProcessor.java | 29 +----------
.../apache/zookeeper/server/ZooKeeperServer.java | 57 ++++++++++++++++++----
2 files changed, 48 insertions(+), 38 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index ec9cdf1..faccc36 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -72,11 +72,9 @@ import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
-import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
import org.apache.zookeeper.txn.ErrorTxn;
-import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,33 +111,8 @@ public class FinalRequestProcessor implements
RequestProcessor {
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
- ProcessTxnResult rc = null;
- synchronized (zks.outstandingChanges) {
- // Need to process local session requests
- rc = zks.processTxn(request);
-
- // request.hdr is set for write requests, which are the only ones
- // that add to outstandingChanges.
- if (request.getHdr() != null) {
- TxnHeader hdr = request.getHdr();
- long zxid = hdr.getZxid();
- while (!zks.outstandingChanges.isEmpty() &&
zks.outstandingChanges.peek().zxid <= zxid) {
- ChangeRecord cr = zks.outstandingChanges.remove();
-
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
- if (cr.zxid < zxid) {
- LOG.warn("Zxid outstanding " + cr.zxid + " is less
than current " + zxid);
- }
- if (zks.outstandingChangesForPath.get(cr.path) == cr) {
- zks.outstandingChangesForPath.remove(cr.path);
- }
- }
- }
- // do not add non quorum packets to the queue.
- if (request.isQuorum()) {
- zks.getZKDatabase().addCommittedProposal(request);
- }
- }
+ ProcessTxnResult rc = zks.processTxn(request);
// ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g.,
closeconn buffer
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 95aaed3..c623b87 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1512,18 +1512,54 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
// entry point for quorum/Learner.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
- return processTxn(null, hdr, txn);
+ processTxnForSessionEvents(null, hdr, txn);
+ return processTxnInDB(hdr, txn);
}
// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
- return processTxn(request, request.getHdr(), request.getTxn());
+ TxnHeader hdr = request.getHdr();
+ processTxnForSessionEvents(request, hdr, request.getTxn());
+
+ final boolean writeRequest = (hdr != null);
+ final boolean quorumRequest = request.isQuorum();
+
+ // return fast w/o synchronization when we get a read
+ if (!writeRequest && !quorumRequest) {
+ return new ProcessTxnResult();
+ }
+ synchronized (outstandingChanges) {
+ ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn());
+
+ // request.hdr is set for write requests, which are the only ones
+ // that add to outstandingChanges.
+ if (writeRequest) {
+ long zxid = hdr.getZxid();
+ while (!outstandingChanges.isEmpty()
+ && outstandingChanges.peek().zxid <= zxid) {
+ ChangeRecord cr = outstandingChanges.remove();
+
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
+ if (cr.zxid < zxid) {
+ LOG.warn("Zxid outstanding " + cr.zxid
+ + " is less than current " + zxid);
+ }
+ if (outstandingChangesForPath.get(cr.path) == cr) {
+ outstandingChangesForPath.remove(cr.path);
+ }
+ }
+ }
+
+ // do not add non quorum packets to the queue.
+ if (quorumRequest) {
+ getZKDatabase().addCommittedProposal(request);
+ }
+ return rc;
+ }
}
- private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record
txn) {
- ProcessTxnResult rc;
- int opCode = request != null ? request.type : hdr.getType();
- long sessionId = request != null ? request.sessionId :
hdr.getClientId();
+ private void processTxnForSessionEvents(Request request, TxnHeader hdr,
Record txn) {
+ int opCode = (request == null) ? hdr.getType() : request.type;
+ long sessionId = (request == null) ? hdr.getClientId() :
request.sessionId;
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
@@ -1535,13 +1571,14 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
+ }
- if (hdr != null) {
- rc = getZKDatabase().processTxn(hdr, txn);
+ private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn) {
+ if (hdr == null) {
+ return new ProcessTxnResult();
} else {
- rc = new ProcessTxnResult();
+ return getZKDatabase().processTxn(hdr, txn);
}
- return rc;
}
public Map<Long, Set<Long>> getSessionExpiryMap() {