This is an automated email from the ASF dual-hosted git repository. hanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push: new a6b38f8 ZOOKEEPER-3359: Batch commits in the CommitProcessor a6b38f8 is described below commit a6b38f83218791f8d9fabc52c865dcccf07026cc Author: Brian Nixon <ni...@fb.com> AuthorDate: Mon Jul 29 17:31:09 2019 -0700 ZOOKEEPER-3359: Batch commits in the CommitProcessor Author: Brian Nixon <ni...@fb.com> Reviewers: Michael Han <h...@apache.org>, Norbert Kalmar <nkal...@yahoo.com>, Enrico Olivelli <eolive...@gmail.com> Closes #905 from enixon/commit-proc-batch --- .../src/main/resources/markdown/zookeeperAdmin.md | 23 ++ .../zookeeper/server/ZooKeeperServerBean.java | 13 + .../zookeeper/server/ZooKeeperServerMXBean.java | 6 + .../zookeeper/server/quorum/CommitProcessor.java | 318 ++++++++++++++------- .../quorum/CommitProcessorConcurrencyTest.java | 159 +++++++++++ .../server/quorum/CommitProcessorMetricsTest.java | 4 + 6 files changed, 422 insertions(+), 101 deletions(-) diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index c596546..835e357 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1445,6 +1445,29 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t Number of Commit Processor worker threads. If configured with 0 worker threads, the main thread will process the request directly. The default value is the number of cpu cores. +* *zookeeper.commitProcessor.maxReadBatchSize* : + (Java system property only: **zookeeper.commitProcessor.maxReadBatchSize**) + Max number of reads to process from queuedRequests before switching to processing commits. + If the value < 0 (default), we switch whenever we have a local write, and pending commits. + A high read batch size will delay commit processing, causing stale data to be served. + If reads are known to arrive in fixed size batches then matching that batch size with + the value of this property can smooth queue performance. Since reads are handled in parallel, + one recommendation is to set this property to match *zookeeper.commitProcessor.numWorkerThread* + (default is the number of cpu cores) or lower. + +* *zookeeper.commitProcessor.maxCommitBatchSize* : + (Java system property only: **zookeeper.commitProcessor.maxCommitBatchSize**) + Max number of commits to process before processing reads. We will try to process as many + remote/local commits as we can till we reach this count. A high commit batch size will delay + reads while processing more commits. A low commit batch size will favor reads. + It is recommended to only set this property when an ensemble is serving a workload with a high + commit rate. If writes are known to arrive in a set number of batches then matching that + batch size with the value of this property can smooth queue performance. A generic + approach would be to set this value to equal the ensemble size so that with the processing + of each batch the current server will probabilistically handle a write related to one of + its direct clients. + Default is "1". Negative and zero values are not supported. + * *znode.container.checkIntervalMs* : (Java system property only) **New in 3.6.0:** The diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java index 92ceab8..6e89820 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java @@ -23,6 +23,7 @@ import java.util.Date; import org.apache.jute.BinaryInputArchive; import org.apache.zookeeper.Version; import org.apache.zookeeper.jmx.ZKMBeanInfo; +import org.apache.zookeeper.server.quorum.CommitProcessor; /** * This class implements the ZooKeeper server MBean interface. @@ -281,6 +282,18 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo { /////////////////////////////////////////////////////////////////////////// + public int getCommitProcMaxReadBatchSize() { return CommitProcessor.getMaxReadBatchSize(); } + + public void setCommitProcMaxReadBatchSize(int size) { CommitProcessor.setMaxReadBatchSize(size); } + + /////////////////////////////////////////////////////////////////////////// + + public int getCommitProcMaxCommitBatchSize() { return CommitProcessor.getMaxCommitBatchSize(); } + + public void setCommitProcMaxCommitBatchSize(int size) { CommitProcessor.setMaxCommitBatchSize(size);} + + /////////////////////////////////////////////////////////////////////////// + @Override public long getFlushDelay() { return zks.getFlushDelay(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java index 4c71eac..7a42eaa 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java @@ -120,6 +120,12 @@ public interface ZooKeeperServerMXBean { public double getConnectionDecreaseRatio(); public void setConnectionDecreaseRatio(double val); + public int getCommitProcMaxReadBatchSize(); + public void setCommitProcMaxReadBatchSize(int size); + + public int getCommitProcMaxCommitBatchSize(); + public void setCommitProcMaxCommitBatchSize(int size); + public int getRequestThrottleLimit(); public void setRequestThrottleLimit(int requests); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java index 6e4b702..77635c9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java @@ -19,11 +19,12 @@ package org.apache.zookeeper.server.quorum; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashMap; import java.util.Map; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.LinkedBlockingQueue; @@ -82,6 +83,12 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements /** Default worker pool shutdown timeout in ms: 5000 (5s) */ public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT = "zookeeper.commitProcessor.shutdownTimeout"; + /** Default max read batch size: -1 to disable the feature */ + public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE = + "zookeeper.commitProcessor.maxReadBatchSize"; + /** Default max commit batch size: 1 */ + public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE = + "zookeeper.commitProcessor.maxCommitBatchSize"; /** * Incoming requests. @@ -90,6 +97,13 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements new LinkedBlockingQueue<Request>(); /** + * Incoming requests that are waiting on a commit, + * contained in order of arrival + */ + protected final LinkedBlockingQueue<Request> queuedWriteRequests = + new LinkedBlockingQueue<>(); + + /** * The number of read requests currently held in all session queues */ private AtomicInteger numReadQueuedRequests = new AtomicInteger(0); @@ -126,6 +140,23 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements private Object emptyPoolSync = new Object(); /** + * Max number of reads to process from queuedRequests before switching to + * processing commits. If the value is negative, we switch whenever we have + * a local write, and pending commits. + * A high read batch size will delay commit processing causing us to + * serve stale data. + */ + private static volatile int maxReadBatchSize; + /** + * Max number of commits to process before processing reads. We will try to + * process as many remote/local commits as we can till we reach this + * count. + * A high commit batch size will delay reads while processing more commits. + * A low commit batch size will favor reads. + */ + private static volatile int maxCommitBatchSize; + + /** * This flag indicates whether we need to wait for a response to come back from the * leader or we just let the sync operation flow through like a read. The flag will * be false if the CommitProcessor is in a Leader pipeline. @@ -209,22 +240,28 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements /* * Processing up to requestsToProcess requests from the incoming - * queue (queuedRequests), possibly less if a committed request - * is present along with a pending local write. After the loop, - * we process one committed request if commitIsWaiting. + * queue (queuedRequests). If maxReadBatchSize is set then no + * commits will be processed until maxReadBatchSize number of + * reads are processed (or no more reads remain in the queue). + * After the loop a single committed request is processed if + * one is waiting (or a batch of commits if maxCommitBatchSize + * is set). */ - Request request = null; + Request request; + int readsProcessed = 0; while (!stopped && requestsToProcess > 0 + && (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize) && (request = queuedRequests.poll()) != null) { requestsToProcess--; if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) { // Add request to pending - pendingRequests - .computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>()) - .add(request); - ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(pendingRequests.get(request.sessionId).size()); + Deque<Request> requests = + pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>()); + requests.addLast(request); + ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size()); } else { + readsProcessed++; numReadQueuedRequests.decrementAndGet(); sendToNextProcessor(request); } @@ -237,9 +274,10 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements * the queue, so if we have a pending request and a * committed request, the committed request must be for that * pending write or for a write originating at a different - * server. + * server. We skip this if maxReadBatchSize is set. */ - if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){ + if (maxReadBatchSize < 0 && + !pendingRequests.isEmpty() && !committedRequests.isEmpty()){ /* * We set commitIsWaiting so that we won't check * committedRequests again. @@ -248,91 +286,111 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements break; } } + ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed); - // Handle a single committed request - if (commitIsWaiting && !stopped){ + if (!commitIsWaiting) { + commitIsWaiting = !committedRequests.isEmpty(); + } + + /* + * Handle commits, if any. + */ + if (commitIsWaiting && !stopped) { + /* + * Drain outstanding reads + */ waitForEmptyPool(); - if (stopped){ + if (stopped) { return; } - // Process committed head - if ((request = committedRequests.poll()) == null) { - throw new IOException("Error: committed head is null"); - } + int commitsToProcess = maxCommitBatchSize; /* - * Check if request is pending, if so, update it with the committed info + * Loop through all the commits, and try to drain them. */ - Deque<Request> sessionQueue = pendingRequests - .get(request.sessionId); - ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size()); - if (sessionQueue != null) { - ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size()); - // If session queue != null, then it is also not empty. - Request topPending = sessionQueue.poll(); - if (request.cxid != topPending.cxid) { - /* - * TL;DR - we should not encounter this scenario often under normal load. - * We pass the commit to the next processor and put the pending back with a warning. - * - * Generally, we can get commit requests that are not at the queue head after - * a session moved (see ZOOKEEPER-2684). Let's denote the previous server of the session - * with A, and the server that the session moved to with B (keep in mind that it is - * possible that the session already moved from B to a new server C, and maybe C=A). - * 1. If request.cxid < topPending.cxid : this means that the session requested this update - * from A, then moved to B (i.e., which is us), and now B receives the commit - * for the update after the session already performed several operations in B - * (and therefore its cxid is higher than that old request). - * 2. If request.cxid > topPending.cxid : this means that the session requested an updated - * from B with cxid that is bigger than the one we know therefore in this case we - * are A, and we lost the connection to the session. Given that we are waiting for a commit - * for that update, it means that we already sent the request to the leader and it will - * be committed at some point (in this case the order of cxid won't follow zxid, since zxid - * is an increasing order). It is not safe for us to delete the session's queue at this - * point, since it is possible that the session has newer requests in it after it moved - * back to us. We just leave the queue as it is, and once the commit arrives (for the old - * request), the finalRequestProcessor will see a closed cnxn handle, and just won't send a - * response. - * Also note that we don't have a local session, therefore we treat the request - * like any other commit for a remote request, i.e., we perform the update without sending - * a response. - */ - LOG.warn("Got request " + request + - " but we are expecting request " + topPending); - sessionQueue.addFirst(topPending); - } else { + Set<Long> queuesToDrain = new HashSet<>(); + long startWriteTime = Time.currentElapsedTime(); + int commitsProcessed = 0; + while (commitIsWaiting && !stopped && commitsToProcess > 0) { + + // Process committed head + request = committedRequests.peek(); + + /* + * Check if this is a local write request is pending, + * if so, update it with the committed info. If the commit matches + * the first write queued in the blockedRequestQueue, we know this is + * a commit for a local write, as commits are received in order. Else + * it must be a commit for a remote write. + */ + if (!queuedWriteRequests.isEmpty() && + queuedWriteRequests.peek().sessionId == request.sessionId && + queuedWriteRequests.peek().cxid == request.cxid) { /* - * Generally, we want to send to the next processor our version of the request, - * since it contains the session information that is needed for post update processing. - * In more details, when a request is in the local queue, there is (or could be) a client - * attached to this server waiting for a response, and there is other bookkeeping of - * requests that are outstanding and have originated from this server - * (e.g., for setting the max outstanding requests) - we need to update this info when an - * outstanding request completes. Note that in the other case (above), the operation - * originated from a different server and there is no local bookkeeping or a local client - * session that needs to be notified. + * Commit matches the earliest write in our write queue. */ - topPending.setHdr(request.getHdr()); - topPending.setTxn(request.getTxn()); - topPending.zxid = request.zxid; - topPending.commitRecvTime = request.commitRecvTime; - request = topPending; - - // Only decrement if we take a request off the queue. - numWriteQueuedRequests.decrementAndGet(); + Deque<Request> sessionQueue = pendingRequests + .get(request.sessionId); + ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size()); + if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) { + /* + * Can't process this write yet. + * Either there are reads pending in this session, or we + * haven't gotten to this write yet. + */ + break; + } else { + ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size()); + // If session queue != null, then it is also not empty. + Request topPending = sessionQueue.poll(); + /* + * Generally, we want to send to the next processor our version of the request, + * since it contains the session information that is needed for post update processing. + * In more details, when a request is in the local queue, there is (or could be) a client + * attached to this server waiting for a response, and there is other bookkeeping of + * requests that are outstanding and have originated from this server + * (e.g., for setting the max outstanding requests) - we need to update this info when an + * outstanding request completes. Note that in the other case, the operation + * originated from a different server and there is no local bookkeeping or a local client + * session that needs to be notified. + */ + topPending.setHdr(request.getHdr()); + topPending.setTxn(request.getTxn()); + topPending.zxid = request.zxid; + topPending.commitRecvTime = request.commitRecvTime; + request = topPending; + // Only decrement if we take a request off the queue. + numWriteQueuedRequests.decrementAndGet(); + queuedWriteRequests.poll(); + queuesToDrain.add(request.sessionId); + } } - } + /* + * Pull the request off the commit queue, now that we are going + * to process it. + */ + committedRequests.remove(); + commitsToProcess--; + commitsProcessed++; - sendToNextProcessor(request); - waitForEmptyPool(); + // Process the write inline. + processWrite(request); + + commitIsWaiting = !committedRequests.isEmpty(); + } + ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.add( + Time.currentElapsedTime() - startWriteTime); + ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed); /* - * Process following reads if any, remove session queue if + * Process following reads if any, remove session queue(s) if * empty. */ - if (sessionQueue != null) { + readsProcessed = 0; + for (Long sessionId : queuesToDrain) { + Deque<Request> sessionQueue = pendingRequests.get(sessionId); int readsAfterWrite = 0; while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) { @@ -341,12 +399,15 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements readsAfterWrite++; } ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite); + readsProcessed += readsAfterWrite; // Remove empty queues if (sessionQueue.isEmpty()) { - pendingRequests.remove(request.sessionId); + pendingRequests.remove(sessionId); } } + ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size()); + ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed); } ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time); @@ -388,6 +449,8 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements workerShutdownTimeoutMS = Long.getLong( ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000); + initBatchSizes(); + LOG.info("Configuring CommitProcessor with " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads."); @@ -409,6 +472,78 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements workerPool.schedule(new CommitWorkRequest(request), request.sessionId); } + private void processWrite(Request request) throws RequestProcessorException { + processCommitMetrics(request, true); + + long timeBeforeFinalProc = Time.currentElapsedTime(); + nextProcessor.processRequest(request); + ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add( + Time.currentElapsedTime() - timeBeforeFinalProc); + } + + private static void initBatchSizes() { + maxReadBatchSize = Integer.getInteger( + ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE, -1); + maxCommitBatchSize = Integer.getInteger( + ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE, 1); + + if (maxCommitBatchSize <= 0) { + String errorMsg = "maxCommitBatchSize must be positive, was " + + maxCommitBatchSize; + throw new IllegalArgumentException(errorMsg); + } + + LOG.info("Configuring CommitProcessor with readBatchSize {} commitBatchSize {}", + maxReadBatchSize, + maxCommitBatchSize); + } + + private static void processCommitMetrics(Request request, boolean isWrite) { + if (isWrite) { + if (request.commitProcQueueStartTime != -1 && + request.commitRecvTime != -1) { + // Locally issued writes. + long currentTime = Time.currentElapsedTime(); + ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime - + request.commitProcQueueStartTime); + ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime - + request.commitRecvTime); + } else if (request.commitRecvTime != -1) { + // Writes issued by other servers. + ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add( + Time.currentElapsedTime() - request.commitRecvTime); + } + } else { + if (request.commitProcQueueStartTime != -1) { + ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add( + Time.currentElapsedTime() - + request.commitProcQueueStartTime); + } + } + } + + public static int getMaxReadBatchSize() { + return maxReadBatchSize; + } + + public static int getMaxCommitBatchSize() { + return maxCommitBatchSize; + } + + public static void setMaxReadBatchSize(int size) { + maxReadBatchSize = size; + LOG.info("Configuring CommitProcessor with readBatchSize {}", + maxReadBatchSize); + } + + public static void setMaxCommitBatchSize(int size) { + if (size > 0) { + maxCommitBatchSize = size; + LOG.info("Configuring CommitProcessor with commitBatchSize {}", + maxCommitBatchSize); + } + } + /** * CommitWorkRequest is a small wrapper class to allow * downstream processing to be run using the WorkerService @@ -431,27 +566,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements public void doWork() throws RequestProcessorException { try { - if (needCommit(request)) { - if (request.commitProcQueueStartTime != -1 && - request.commitRecvTime != -1) { - // Locally issued writes. - long currentTime = Time.currentElapsedTime(); - ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime - - request.commitProcQueueStartTime); - ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime - - request.commitRecvTime); - } else if (request.commitRecvTime != -1) { - // Writes issued by other servers. - ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add( - Time.currentElapsedTime() - request.commitRecvTime); - } - } else { - if (request.commitProcQueueStartTime != -1) { - ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add( - Time.currentElapsedTime() - - request.commitProcQueueStartTime); - } - } + processCommitMetrics(request, needCommit(request)); long timeBeforeFinalProc = Time.currentElapsedTime(); nextProcessor.processRequest(request); @@ -508,6 +623,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements queuedRequests.add(request); // If the request will block, add it to the queue of blocking requests if (needCommit(request)) { + queuedWriteRequests.add(request); numWriteQueuedRequests.incrementAndGet(); } else { numReadQueuedRequests.incrementAndGet(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java index 0e002b9..95889ef 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java @@ -62,6 +62,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { public void setUp() throws Exception { processedRequests = new LinkedBlockingQueue<Request>(); processor = new MockCommitProcessor(); + CommitProcessor.setMaxReadBatchSize(-1); + CommitProcessor.setMaxCommitBatchSize(1); } @After @@ -148,6 +150,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.committedRequests.add(writeReq); processor.queuedRequests.add(readReq); processor.queuedRequests.add(writeReq); + processor.queuedWriteRequests.add(writeReq); processor.initThreads(1); processor.stoppedMainLoop = true; @@ -194,6 +197,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionId, sessionId + 2); processor.queuedRequests.add(writeReq); + processor.queuedWriteRequests.add(writeReq); processor.queuedRequests.add(readReq); shouldNotBeProcessed.add(writeReq); shouldNotBeProcessed.add(readReq); @@ -232,6 +236,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, 0x1, 1); processor.queuedRequests.add(writeReq); + processor.queuedWriteRequests.add(writeReq); shouldBeInPending.add(writeReq); for (int readReqId = 2; readReqId <= 5; ++readReqId) { @@ -249,6 +254,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processedRequests.isEmpty()); Assert.assertTrue("Did not handled all of queuedRequests' requests", processor.queuedRequests.isEmpty()); + Assert.assertTrue("Removed from blockedQueuedRequests before commit", + !processor.queuedWriteRequests.isEmpty()); shouldBeInPending .removeAll(processor.pendingRequests.get(writeReq.sessionId)); @@ -273,6 +280,155 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { processor.committedRequests.isEmpty()); Assert.assertTrue("Did not process committed request", processor.pendingRequests.isEmpty()); + Assert.assertTrue("Did not remove from blockedQueuedRequests", + processor.queuedWriteRequests.isEmpty()); + } + + /** + * In the following test, we add a write request followed by several read + * requests of the same session. We will do this for 2 sessions. For the + * second session, we will queue up another write after the reads, and + * we verify several things - 1. The writes are not processed until + * the commits arrive. 2. Only 2 writes are processed, with maxCommitBatchSize + * of 3, due to the blocking reads. 3. Once the writes are processed, + * all the read requests are processed as well. 4. All read requests are + * executed after the write, before any other write for that session, + * along with new reads. 5. Then we add another read for session 1, and + * another write and commit for session 2. 6. Only the old write, and the read + * are processed, leaving the commit in the queue. 7. Last write is executed + * in the last iteration, and all lists are empty. + */ + @Test + public void processAllWritesMaxBatchSize() + throws Exception { + final String path = "/processAllWritesMaxBatchSize"; + HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>(); + + Request writeReq = newRequest( + new CreateRequest(path + "_1", new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x1, 1); + processor.queuedRequests.add(writeReq); + processor.queuedWriteRequests.add(writeReq); + + Request writeReq2 = newRequest( + new CreateRequest(path + "_2", new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x2, 1); + processor.queuedRequests.add(writeReq2); + processor.queuedWriteRequests.add(writeReq2); + + for (int readReqId = 2; readReqId <= 5; ++readReqId) { + Request readReq = newRequest(new GetDataRequest(path, false), + OpCode.getData, 0x1, readReqId); + Request readReq2 = newRequest(new GetDataRequest(path, false), + OpCode.getData, 0x2, readReqId); + processor.queuedRequests.add(readReq); + shouldBeProcessedAfterPending.add(readReq); + processor.queuedRequests.add(readReq2); + shouldBeProcessedAfterPending.add(readReq2); + } + + Request writeReq3 = newRequest( + new CreateRequest(path + "_3", new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x2, 6); + processor.queuedRequests.add(writeReq3); + processor.queuedWriteRequests.add(writeReq3); + + processor.initThreads(defaultSizeOfThreadPool); + + processor.stoppedMainLoop = true; + CommitProcessor.setMaxCommitBatchSize(2); + processor.run(); + Assert.assertTrue("Processed without waiting for commit", + processedRequests.isEmpty()); + Assert.assertTrue("Did not handled all of queuedRequests' requests", + processor.queuedRequests.isEmpty()); + Assert.assertTrue("Removed from blockedQueuedRequests before commit", + !processor.queuedWriteRequests.isEmpty()); + Assert.assertTrue("Missing session 1 in pending queue", + processor.pendingRequests.containsKey(writeReq.sessionId)); + Assert.assertTrue("Missing session 2 in pending queue", + processor.pendingRequests.containsKey(writeReq2.sessionId)); + + processor.committedRequests.add(writeReq); + processor.committedRequests.add(writeReq2); + processor.committedRequests.add(writeReq3); + processor.stoppedMainLoop = true; + CommitProcessor.setMaxCommitBatchSize(3); + processor.run(); + processor.initThreads(defaultSizeOfThreadPool); + + Thread.sleep(500); + Assert.assertTrue("Did not process committed request", + processedRequests.peek() == writeReq); + Assert.assertTrue("Did not process following read request", + processedRequests.containsAll(shouldBeProcessedAfterPending)); + Assert.assertTrue("Processed committed request", + !processor.committedRequests.isEmpty()); + Assert.assertTrue("Removed commit for write req 3", + processor.committedRequests.peek() == writeReq3); + Assert.assertTrue("Processed committed request", + !processor.pendingRequests.isEmpty()); + Assert.assertTrue("Missing session 2 in pending queue", + processor.pendingRequests.containsKey(writeReq3.sessionId)); + Assert.assertTrue("Missing write 3 in pending queue", + processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3); + Assert.assertTrue("Removed from blockedQueuedRequests", + !processor.queuedWriteRequests.isEmpty()); + Assert.assertTrue("Removed write req 3 from blockedQueuedRequests", + processor.queuedWriteRequests.peek() == writeReq3); + + Request readReq3 = newRequest(new GetDataRequest(path, false), + OpCode.getData, 0x1, 7); + processor.queuedRequests.add(readReq3); + shouldBeProcessedAfterPending.add(readReq3); + Request writeReq4 = newRequest( + new CreateRequest(path + "_4", new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x2, 7); + + processor.queuedRequests.add(writeReq4); + processor.queuedWriteRequests.add(writeReq4); + processor.committedRequests.add(writeReq4); + + processor.stoppedMainLoop = true; + CommitProcessor.setMaxCommitBatchSize(3); + processor.run(); + processor.initThreads(defaultSizeOfThreadPool); + + Thread.sleep(500); + Assert.assertTrue("Did not process committed request", + processedRequests.peek() == writeReq); + Assert.assertTrue("Did not process following read request", + processedRequests.containsAll(shouldBeProcessedAfterPending)); + Assert.assertTrue("Processed unexpected committed request", + !processor.committedRequests.isEmpty()); + Assert.assertTrue("Unexpected pending request", + processor.pendingRequests.isEmpty()); + Assert.assertTrue("Removed from blockedQueuedRequests", + !processor.queuedWriteRequests.isEmpty()); + Assert.assertTrue("Removed write req 4 from blockedQueuedRequests", + processor.queuedWriteRequests.peek() == writeReq4); + + processor.stoppedMainLoop = true; + CommitProcessor.setMaxCommitBatchSize(3); + processor.run(); + processor.initThreads(defaultSizeOfThreadPool); + + Thread.sleep(500); + Assert.assertTrue("Did not process committed request", + processedRequests.peek() == writeReq); + Assert.assertTrue("Did not process following read request", + processedRequests.containsAll(shouldBeProcessedAfterPending)); + Assert.assertTrue("Did not process committed request", + processor.committedRequests.isEmpty()); + Assert.assertTrue("Did not process committed request", + processor.pendingRequests.isEmpty()); + Assert.assertTrue("Did not remove from blockedQueuedRequests", + processor.queuedWriteRequests.isEmpty()); + } /** @@ -322,6 +478,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, 0x3, 1); processor.queuedRequests.add(firstCommittedReq); + processor.queuedWriteRequests.add(firstCommittedReq); processor.committedRequests.add(firstCommittedReq); Set<Request> allReads = new HashSet<Request>(); @@ -399,6 +556,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, sessionid, readReqId++); processor.queuedRequests.add(firstCommittedReq); + processor.queuedWriteRequests.add(firstCommittedReq); localRequests.add(firstCommittedReq); // queue read requests to queuedRequests @@ -463,6 +621,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase { CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), OpCode.create, sessionid, lastCXid); processor.queuedRequests.add(orphanCommittedReq); + processor.queuedWriteRequests.add(orphanCommittedReq); localRequests.add(orphanCommittedReq); // queue read requests to queuedRequests diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java index 426aae9..f284052 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java @@ -51,6 +51,10 @@ public class CommitProcessorMetricsTest extends ZKTestCase { public void setup() { LOG.info("setup"); ServerMetrics.getMetrics().resetAll(); + + // ensure no leaked parallelism properties + System.clearProperty("zookeeper.commitProcessor.maxReadBatchSize"); + System.clearProperty("zookeeper.commitProcessor.maxCommitBatchSize"); } public void setupProcessors(int commitWorkers, int finalProcTime ) {