This is an automated email from the ASF dual-hosted git repository.
fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new cb9727d ZOOKEEPER-3268: Add commit processor metrics
cb9727d is described below
commit cb9727dc5b6f37984fef5f62473ae4064a9f15cb
Author: Jie Huang <[email protected]>
AuthorDate: Wed Apr 24 17:11:50 2019 -0700
ZOOKEEPER-3268: Add commit processor metrics
Author: Jie Huang <[email protected]>
Reviewers: [email protected], [email protected], [email protected]
Closes #800 from jhuan31/ZOOKEEPER-3268
---
.../java/org/apache/zookeeper/server/Request.java | 4 +
.../org/apache/zookeeper/server/ServerMetrics.java | 110 +++++
.../zookeeper/server/quorum/CommitProcessor.java | 109 ++++-
.../server/quorum/CommitProcessorMetricsTest.java | 499 +++++++++++++++++++++
4 files changed, 711 insertions(+), 11 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index f6c50bc..aef11bf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -80,6 +80,10 @@ public class Request {
public long prepQueueStartTime= -1;
+ public long commitProcQueueStartTime = -1;
+
+ public long commitRecvTime = -1;
+
private Object owner;
private KeeperException e;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 5fefdfc..3c089de 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -131,6 +131,61 @@ public final class ServerMetrics {
SESSIONLESS_CONNECTIONS_EXPIRED =
metricsContext.getCounter("sessionless_connections_expired");
STALE_SESSIONS_EXPIRED =
metricsContext.getCounter("stale_sessions_expired");
+ /*
+ * Number of requests that are in the session queue.
+ */
+ REQUESTS_IN_SESSION_QUEUE =
metricsContext.getSummary("requests_in_session_queue", DetailLevel.BASIC);
+ PENDING_SESSION_QUEUE_SIZE =
metricsContext.getSummary("pending_session_queue_size", DetailLevel.BASIC);
+ /*
+ * Consecutive number of read requests that are in the session queue
right after a commit request.
+ */
+ READS_AFTER_WRITE_IN_SESSION_QUEUE =
metricsContext.getSummary("reads_after_write_in_session_queue",
DetailLevel.BASIC);
+ READ_ISSUED_FROM_SESSION_QUEUE =
metricsContext.getSummary("reads_issued_from_session_queue", DetailLevel.BASIC);
+ SESSION_QUEUES_DRAINED =
metricsContext.getSummary("session_queues_drained", DetailLevel.BASIC);
+
+ TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ =
metricsContext.getSummary("time_waiting_empty_pool_in_commit_processor_read_ms",
DetailLevel.BASIC);
+ WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR =
metricsContext.getSummary("write_batch_time_in_commit_processor",
DetailLevel.BASIC);
+
+ CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR =
metricsContext.getSummary("concurrent_request_processing_in_commit_processor",
DetailLevel.BASIC);
+
+ READS_QUEUED_IN_COMMIT_PROCESSOR =
metricsContext.getSummary("read_commit_proc_req_queued", DetailLevel.BASIC);
+ WRITES_QUEUED_IN_COMMIT_PROCESSOR =
metricsContext.getSummary("write_commit_proc_req_queued", DetailLevel.BASIC);
+ COMMITS_QUEUED_IN_COMMIT_PROCESSOR =
metricsContext.getSummary("commit_commit_proc_req_queued", DetailLevel.BASIC);
+ COMMITS_QUEUED = metricsContext.getCounter("request_commit_queued");
+ READS_ISSUED_IN_COMMIT_PROC =
metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
+ WRITES_ISSUED_IN_COMMIT_PROC =
metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);
+
+ /**
+ * Time spent by a read request in the commit processor.
+ */
+ READ_COMMITPROC_TIME =
metricsContext.getSummary("read_commitproc_time_ms", DetailLevel.ADVANCED);
+
+ /**
+ * Time spent by a write request in the commit processor.
+ */
+ WRITE_COMMITPROC_TIME =
metricsContext.getSummary("write_commitproc_time_ms", DetailLevel.ADVANCED);
+
+ /**
+ * Time spent by a committed request, for a locally issued write, in
the
+ * commit processor.
+ */
+ LOCAL_WRITE_COMMITTED_TIME =
metricsContext.getSummary("local_write_committed_time_ms",
DetailLevel.ADVANCED);
+
+ /**
+ * Time spent by a committed request for a write, issued by other
server, in the
+ * commit processor.
+ */
+ SERVER_WRITE_COMMITTED_TIME =
metricsContext.getSummary("server_write_committed_time_ms",
DetailLevel.ADVANCED);
+
+ COMMIT_PROCESS_TIME = metricsContext.getSummary("commit_process_time",
DetailLevel.BASIC);
+
+
+ /**
+ * Time spent by the final processor. This is tracked in the commit
processor.
+ */
+ READ_FINAL_PROC_TIME =
metricsContext.getSummary("read_final_proc_time_ms", DetailLevel.ADVANCED);
+ WRITE_FINAL_PROC_TIME =
metricsContext.getSummary("write_final_proc_time_ms", DetailLevel.ADVANCED);
+
}
/**
@@ -221,6 +276,61 @@ public final class ServerMetrics {
public final Counter RESPONSE_PACKET_CACHE_MISSING;
/*
+ * Number of requests that are in the session queue.
+ */
+ public final Summary REQUESTS_IN_SESSION_QUEUE;
+ public final Summary PENDING_SESSION_QUEUE_SIZE;
+ /*
+ * Consecutive number of read requests that are in the session queue right
after a commit request.
+ */
+ public final Summary READS_AFTER_WRITE_IN_SESSION_QUEUE;
+ public final Summary READ_ISSUED_FROM_SESSION_QUEUE;
+ public final Summary SESSION_QUEUES_DRAINED;
+
+ public final Summary TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ;
+ public final Summary WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR;
+
+ public final Summary CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR;
+
+ public final Summary READS_QUEUED_IN_COMMIT_PROCESSOR;
+ public final Summary WRITES_QUEUED_IN_COMMIT_PROCESSOR;
+ public final Summary COMMITS_QUEUED_IN_COMMIT_PROCESSOR;
+ public final Counter COMMITS_QUEUED;
+ public final Summary READS_ISSUED_IN_COMMIT_PROC;
+ public final Summary WRITES_ISSUED_IN_COMMIT_PROC;
+
+ /**
+ * Time spent by a read request in the commit processor.
+ */
+ public final Summary READ_COMMITPROC_TIME;
+
+ /**
+ * Time spent by a write request in the commit processor.
+ */
+ public final Summary WRITE_COMMITPROC_TIME;
+
+ /**
+ * Time spent by a committed request, for a locally issued write, in the
+ * commit processor.
+ */
+ public final Summary LOCAL_WRITE_COMMITTED_TIME;
+
+ /**
+ * Time spent by a committed request for a write, issued by other server,
in the
+ * commit processor.
+ */
+ public final Summary SERVER_WRITE_COMMITTED_TIME;
+
+ public final Summary COMMIT_PROCESS_TIME;
+
+
+ /**
+ * Time spent by the final processor. This is tracked in the commit
processor.
+ */
+ public final Summary READ_FINAL_PROC_TIME;
+ public final Summary WRITE_FINAL_PROC_TIME;
+
+ /*
* Number of successful matches of expected ensemble name in
EnsembleAuthenticationProvider.
*/
public final Counter ENSEMBLE_AUTH_SUCCESS;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
index aef8771..6e4b702 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -24,13 +24,14 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.WorkerService;
@@ -41,7 +42,7 @@ import org.apache.zookeeper.server.ZooKeeperServerListener;
* This RequestProcessor matches the incoming committed requests with the
* locally submitted requests. The trick is that locally submitted requests
that
* change the state of the system will come back as incoming committed
requests,
- * so we need to match them up. Instead of just waiting for the committed
requests,
+ * so we need to match them up. Instead of just waiting for the committed
requests,
* we process the uncommitted requests that belong to other sessions.
*
* The CommitProcessor is multi-threaded. Communication between threads is
@@ -89,6 +90,16 @@ public class CommitProcessor extends ZooKeeperCriticalThread
implements
new LinkedBlockingQueue<Request>();
/**
+ * The number of read requests currently held in all session queues
+ */
+ private AtomicInteger numReadQueuedRequests = new AtomicInteger(0);
+
+ /**
+ * The number of quorum requests currently held in all session queued
+ */
+ private AtomicInteger numWriteQueuedRequests = new AtomicInteger(0);
+
+ /**
* Requests that have been committed.
*/
protected final LinkedBlockingQueue<Request> committedRequests =
@@ -108,7 +119,7 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
/** For testing purposes, we use a separated stopping condition for the
* outer loop.*/
- protected volatile boolean stoppedMainLoop = true;
+ protected volatile boolean stoppedMainLoop = true;
protected volatile boolean stopped = true;
private long workerShutdownTimeoutMS;
protected WorkerService workerPool;
@@ -146,7 +157,7 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
case OpCode.setACL:
return true;
case OpCode.sync:
- return matchSyncs;
+ return matchSyncs;
case OpCode.createSession:
case OpCode.closeSession:
return !request.isLocalSession();
@@ -189,6 +200,13 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
}
}
}
+
+
ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
+
ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
+
ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size());
+
+ long time = Time.currentElapsedTime();
+
/*
* Processing up to requestsToProcess requests from the
incoming
* queue (queuedRequests), possibly less if a committed request
@@ -203,10 +221,11 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
|| pendingRequests.containsKey(request.sessionId))
{
// Add request to pending
pendingRequests
- .computeIfAbsent(request.sessionId, sid -> new
ArrayDeque<>())
- .add(request);
- }
- else {
+ .computeIfAbsent(request.sessionId, sid -> new
ArrayDeque<>())
+ .add(request);
+
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(pendingRequests.get(request.sessionId).size());
+ } else {
+ numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(request);
}
/*
@@ -248,7 +267,9 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
*/
Deque<Request> sessionQueue = pendingRequests
.get(request.sessionId);
+
ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
if (sessionQueue != null) {
+
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
// If session queue != null, then it is also not empty.
Request topPending = sessionQueue.poll();
if (request.cxid != topPending.cxid) {
@@ -296,12 +317,15 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
topPending.setHdr(request.getHdr());
topPending.setTxn(request.getTxn());
topPending.zxid = request.zxid;
+ topPending.commitRecvTime = request.commitRecvTime;
request = topPending;
+
+ // Only decrement if we take a request off the
queue.
+ numWriteQueuedRequests.decrementAndGet();
}
}
sendToNextProcessor(request);
-
waitForEmptyPool();
/*
@@ -309,16 +333,24 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
* empty.
*/
if (sessionQueue != null) {
+ int readsAfterWrite = 0;
while (!stopped && !sessionQueue.isEmpty()
&& !needCommit(sessionQueue.peek())) {
+ numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(sessionQueue.poll());
+ readsAfterWrite++;
}
+
ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
+
// Remove empty queues
if (sessionQueue.isEmpty()) {
pendingRequests.remove(request.sessionId);
}
}
}
+
+
ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() -
time);
+ endOfIteration();
} while (!stoppedMainLoop);
} catch (Throwable e) {
handleException(this.getName(), e);
@@ -326,12 +358,26 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
LOG.info("CommitProcessor exited loop!");
}
- private void waitForEmptyPool() throws InterruptedException {
+ //for test only
+ protected void endOfIteration() {
+
+ }
+
+ protected void waitForEmptyPool() throws InterruptedException {
+ int numRequestsInProcess = numRequestsProcessing.get();
+ if (numRequestsInProcess != 0) {
+
ServerMetrics.getMetrics().CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR.add(
+ numRequestsInProcess);
+ }
+
+ long startWaitTime = Time.currentElapsedTime();
synchronized(emptyPoolSync) {
while ((!stopped) && isProcessingRequest()) {
emptyPoolSync.wait();
}
}
+
ServerMetrics.getMetrics().TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ.add(
+ Time.currentElapsedTime() - startWaitTime);
}
@Override
@@ -385,8 +431,40 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
public void doWork() throws RequestProcessorException {
try {
+ if (needCommit(request)) {
+ if (request.commitProcQueueStartTime != -1 &&
+ request.commitRecvTime != -1) {
+ // Locally issued writes.
+ long currentTime = Time.currentElapsedTime();
+
ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
+ request.commitProcQueueStartTime);
+
ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
+ request.commitRecvTime);
+ } else if (request.commitRecvTime != -1) {
+ // Writes issued by other servers.
+
ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
+ Time.currentElapsedTime() -
request.commitRecvTime);
+ }
+ } else {
+ if (request.commitProcQueueStartTime != -1) {
+ ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
+ Time.currentElapsedTime() -
+ request.commitProcQueueStartTime);
+ }
+ }
+
+ long timeBeforeFinalProc = Time.currentElapsedTime();
nextProcessor.processRequest(request);
+ if (needCommit(request)) {
+ ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(
+ Time.currentElapsedTime() - timeBeforeFinalProc);
+ } else {
+ ServerMetrics.getMetrics().READ_FINAL_PROC_TIME.add(
+ Time.currentElapsedTime() - timeBeforeFinalProc);
+ }
+
} finally {
+
if (numRequestsProcessing.decrementAndGet() == 0){
wakeupOnEmpty();
}
@@ -404,7 +482,7 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
emptyPoolSync.notifyAll();
}
}
-
+
public void commit(Request request) {
if (stopped || request == null) {
return;
@@ -412,6 +490,8 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
+ request.commitRecvTime = Time.currentElapsedTime();
+ ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
committedRequests.add(request);
wakeup();
}
@@ -424,7 +504,14 @@ public class CommitProcessor extends
ZooKeeperCriticalThread implements
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
+ request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
+ // If the request will block, add it to the queue of blocking requests
+ if (needCommit(request)) {
+ numWriteQueuedRequests.incrementAndGet();
+ } else {
+ numReadQueuedRequests.incrementAndGet();
+ }
wakeup();
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
new file mode 100644
index 0000000..426aae9
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.*;
+
+public class CommitProcessorMetricsTest extends ZKTestCase {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(CommitProcessorMetricsTest.class);
+ CommitProcessor commitProcessor;
+ DummyFinalProcessor finalProcessor;
+
+ CountDownLatch requestScheduled = null;
+ CountDownLatch requestProcessed = null;
+ CountDownLatch commitSeen = null;
+ CountDownLatch poolEmpytied = null;
+
+ @Before
+ public void setup() {
+ LOG.info("setup");
+ ServerMetrics.getMetrics().resetAll();
+ }
+
+ public void setupProcessors(int commitWorkers, int finalProcTime ) {
+ finalProcessor = new DummyFinalProcessor(finalProcTime);
+ commitProcessor = new TestCommitProcessor(finalProcessor,
commitWorkers);
+ commitProcessor.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("tearDown starting");
+
+ commitProcessor.shutdown();
+ commitProcessor.join();
+ }
+
+ private class TestCommitProcessor extends CommitProcessor {
+ int numWorkerThreads;
+
+ public TestCommitProcessor(RequestProcessor finalProcessor, int
numWorkerThreads) {
+ super(finalProcessor, "1", true, null);
+ this.numWorkerThreads = numWorkerThreads;
+ }
+
+ @Override
+ public void start() {
+ super.workerPool = new TestWorkerService(numWorkerThreads);
+ super.start();
+ // Since there are two threads--the test thread that puts requests
into the queue and the processor
+ // thread (this thread) that removes requests from the queue--the
execution order in general is
+ // indeterminate, making it hard to check the test results.
+ //
+ // In some tests, we really want the requests processed one by
one. To achieve this, we make sure that
+ // things happen in this order:
+ // processor thread gets into WAITING -> test thread sets
requestProcessed latch -> test thread puts
+ // a request into the queue (which wakes up the processor thread
in the WAITING state) and waits for
+ // the requestProcessed latch -> the processor thread wakes up and
removes the request from the queue and
+ // processes it and opens the requestProcessed latch -> the test
thread continues onto the next request
+
+ // So it is important for the processor thread to get into WAITING
before any request is put into the queue.
+ // Otherwise, it would miss the wakeup signal and wouldn't process
the request or open the latch and the
+ // test thread waiting on the latch would be stuck
+ Thread.State state = super.getState();
+ while (state != State.WAITING) {
+ try {
+ Thread.sleep(50);
+ } catch (Exception e){
+
+ }
+ state = super.getState();
+ }
+ LOG.info("numWorkerThreads in Test is {}", numWorkerThreads);
+ }
+
+ @Override
+ protected void endOfIteration() {
+ if (requestProcessed != null) {
+ requestProcessed.countDown();
+ }
+ }
+
+ @Override
+ protected void waitForEmptyPool() throws InterruptedException {
+ if (commitSeen != null) {
+ commitSeen.countDown();
+ }
+ super.waitForEmptyPool();
+ if (poolEmpytied != null) {
+ poolEmpytied.countDown();
+ }
+ }
+ }
+
+ private class TestWorkerService extends WorkerService {
+ public TestWorkerService(int numWorkerThreads) {
+ super("CommitProcWork", numWorkerThreads, true);
+ }
+
+ @Override
+ public void schedule(WorkRequest workRequest, long id) {
+ super.schedule(workRequest, id);
+ if (requestScheduled != null) {
+ requestScheduled.countDown();
+ }
+ }
+ }
+
+ private class DummyFinalProcessor implements RequestProcessor {
+ int processTime;
+ public DummyFinalProcessor(int processTime) {
+ this.processTime = processTime;
+ }
+
+ @Override
+ public void processRequest(Request request) {
+ if (processTime > 0) {
+ try {
+ if (commitSeen != null) {
+ commitSeen.await(5, TimeUnit.SECONDS);
+ }
+ Thread.sleep(processTime);
+ } catch (Exception e) {
+
+ }
+ }
+ }
+
+ @Override
+ public void shutdown(){
+ }
+ }
+
+ private void checkMetrics(String metricName, long min, long max, double
avg, long cnt, long sum) {
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+ Assert.assertEquals("expected min is " + min, min, values.get("min_" +
metricName));
+ Assert.assertEquals("expected max is: " + max, max, values.get("max_"
+ metricName));
+ Assert.assertEquals("expected avg is: " + avg, avg,
(Double)values.get("avg_" + metricName), 0.001);
+ Assert.assertEquals("expected cnt is: " + cnt, cnt, values.get("cnt_"
+ metricName));
+ Assert.assertEquals("expected sum is: " + sum, sum, values.get("sum_"
+ metricName));
+ }
+
+ private void checkTimeMetric(long actual, long lBoundrary, long
hBoundrary) {
+ Assert.assertThat(actual, greaterThanOrEqualTo(lBoundrary));
+ Assert.assertThat(actual, lessThanOrEqualTo(hBoundrary));
+ }
+
+ private Request createReadRequest(long sessionId, int xid) {
+ return new Request(null, sessionId, xid, ZooDefs.OpCode.getData,
+ ByteBuffer.wrap(new byte[10]), null);
+ }
+
+ private Request createWriteRequest(long sessionId, int xid) {
+ return new Request(null, sessionId, xid, ZooDefs.OpCode.setData,
+ ByteBuffer.wrap(new byte[10]), null);
+ }
+
+ private void processRequestWithWait(Request request) throws Exception {
+ requestProcessed = new CountDownLatch(1);
+ commitProcessor.processRequest(request);
+ requestProcessed.await(5, TimeUnit.SECONDS);
+ }
+
+ private void commitWithWait(Request request) throws Exception {
+ requestProcessed = new CountDownLatch(1);
+ commitProcessor.commit(request);
+ requestProcessed.await(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testRequestsInSessionQueue() throws Exception {
+ setupProcessors(0, 0);
+
+ Request req1 = createWriteRequest(1l, 1);
+ processRequestWithWait(req1);
+
+ checkMetrics("requests_in_session_queue", 1L, 1L, 1D, 1L, 1L);
+
+ //these two read requests will be stuck in the session queue because
there is write in front of them
+ processRequestWithWait(createReadRequest(1L, 2));
+ processRequestWithWait(createReadRequest(1L,3));
+
+ checkMetrics("requests_in_session_queue", 1L, 3L, 2D, 3L, 6);
+
+ commitWithWait(req1);
+
+ checkMetrics("requests_in_session_queue", 1L, 3L, 2.25D, 4L, 9);
+ }
+
+ @Test
+ public void testWriteFinalProcTime() throws Exception {
+ setupProcessors(0, 1000);
+
+ Request req1 = createWriteRequest(1l, 2);
+ processRequestWithWait(req1);
+
+ //no request sent to next processor yet
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(0L, values.get("cnt_write_final_proc_time_ms"));
+
+ commitWithWait(req1);
+
+ values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(1L, values.get("cnt_write_final_proc_time_ms"));
+ checkTimeMetric((long)values.get("max_write_final_proc_time_ms"),
1000L, 2000L);
+ }
+
+ @Test
+ public void testReadFinalProcTime() throws Exception {
+ setupProcessors(0, 1000);
+
+ processRequestWithWait(createReadRequest(1L, 1));
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(1L, values.get("cnt_read_final_proc_time_ms"));
+ checkTimeMetric((long)values.get("max_read_final_proc_time_ms"),
1000L, 2000L);
+ }
+
+ @Test
+ public void testCommitProcessTime() throws Exception {
+ setupProcessors(0, 0);
+ processRequestWithWait(createReadRequest(1L, 1));
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(1L, values.get("cnt_commit_process_time"));
+ checkTimeMetric((long)values.get("max_commit_process_time"), 0L,
1000L);
+ }
+
+ @Test
+ public void testServerWriteCommittedTime() throws Exception {
+ setupProcessors(0, 0);
+ //a commit w/o pending request is a write from other servers
+ commitWithWait(createWriteRequest(1L, 1));
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(1L,
values.get("cnt_server_write_committed_time_ms"));
+
checkTimeMetric((long)values.get("max_server_write_committed_time_ms"), 0L,
1000L);
+ }
+
+ @Test
+ public void testLocalWriteCommittedTime() throws Exception {
+ setupProcessors(0, 0);
+ Request req1 = createWriteRequest(1l, 2);
+ processRequestWithWait(req1);
+ commitWithWait(req1);
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+ Assert.assertEquals(1L,
values.get("cnt_local_write_committed_time_ms"));
+ checkTimeMetric((long)values.get("max_local_write_committed_time_ms"),
0l, 1000l);
+
+ Request req2 = createWriteRequest(1l, 2);
+ processRequestWithWait(req2);
+ //the second write will be stuck in the session queue for at least one
second
+ //but the LOCAL_WRITE_COMMITTED_TIME is from when the commit is
received
+ Thread.sleep(1000);
+
+ commitWithWait(req2);
+
+ values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(2L,
values.get("cnt_local_write_committed_time_ms"));
+ checkTimeMetric((long)values.get("max_local_write_committed_time_ms"),
0L, 1000L);
+ }
+
+
+ @Test
+ public void testWriteCommitProcTime() throws Exception {
+ setupProcessors(0, 0);
+ Request req1 = createWriteRequest(1l, 2);
+ processRequestWithWait(req1);
+ commitWithWait(req1);
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+ Assert.assertEquals(1L, values.get("cnt_write_commitproc_time_ms"));
+ checkTimeMetric((long)values.get("max_write_commitproc_time_ms"), 0l,
1000l);
+
+ Request req2 = createWriteRequest(1l, 2);
+ processRequestWithWait(req2);
+ //the second write will be stuck in the session queue for at least one
second
+ Thread.sleep(1000);
+
+ commitWithWait(req2);
+
+ values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(2L, values.get("cnt_write_commitproc_time_ms"));
+ checkTimeMetric((long)values.get("max_write_commitproc_time_ms"),
1000L, 2000L);
+ }
+
+
+ @Test
+ public void testReadCommitProcTime() throws Exception {
+ setupProcessors(0, 0);
+ processRequestWithWait(createReadRequest(1l, 1));
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+
+ Assert.assertEquals(1L, values.get("cnt_read_commitproc_time_ms"));
+ checkTimeMetric((long)values.get("max_read_commitproc_time_ms"), 0l,
1000l);
+
+ Request req1 = createWriteRequest(1l, 2);
+ processRequestWithWait(req1);
+ processRequestWithWait(createReadRequest(1l, 3));
+ //the second read will be stuck in the session queue for at least one
second
+ Thread.sleep(1000);
+
+ commitWithWait(req1);
+
+ values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(2L, values.get("cnt_read_commitproc_time_ms"));
+ checkTimeMetric((long)values.get("max_read_commitproc_time_ms"),
1000L, 2000L);
+ }
+
+
+ @Test
+ public void testTimeWaitingEmptyPoolInCommitProcessorRead() throws
Exception {
+ setupProcessors(1, 1000);
+
+ //three read requests will be scheduled first
+ requestScheduled = new CountDownLatch(3);
+ commitProcessor.processRequest(createReadRequest(0l, 2));
+ commitProcessor.processRequest(createReadRequest(1l, 3));
+ commitProcessor.processRequest(createReadRequest(2l, 4));
+ requestScheduled.await(5, TimeUnit.SECONDS);
+
+ //add a commit request to trigger waitForEmptyPool
+ poolEmpytied = new CountDownLatch(1);
+ commitProcessor.commit(createWriteRequest(1l, 1));
+ poolEmpytied.await(5, TimeUnit.SECONDS);
+
+ long actual =
(long)MetricsUtils.currentServerMetrics().get("max_time_waiting_empty_pool_in_commit_processor_read_ms");
+ //since each request takes 1000ms to process, so the waiting shouldn't
be more than three times of that
+ checkTimeMetric(actual, 2500L, 3500L);
+ }
+
+
+ @Test
+ public void testConcurrentRequestProcessingInCommitProcessor() throws
Exception {
+ setupProcessors(3, 1000);
+
+ //three read requests will be processed in parallel
+ commitSeen = new CountDownLatch(1);
+ commitProcessor.processRequest(createReadRequest(1l, 2));
+ commitProcessor.processRequest(createReadRequest(1l, 3));
+ commitProcessor.processRequest(createReadRequest(1l, 4));
+
+ //add a commit request to trigger waitForEmptyPool, which will record
number of requests being proccessed
+ poolEmpytied = new CountDownLatch(1);
+ commitProcessor.commit(createWriteRequest(1l, 1));
+ poolEmpytied.await(5, TimeUnit.SECONDS);
+
+ //this will change after we upstream batch write in CommitProcessor
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(3L,
values.get("max_concurrent_request_processing_in_commit_processor"));
+ }
+
+ @Test
+ public void testReadsAfterWriteInSessionQueue() throws Exception {
+ setupProcessors(0, 0);
+ //this read request is before write
+ processRequestWithWait(createReadRequest(1l, 1));
+
+ //one write request
+ Request req1 = createWriteRequest(1l, 1);
+ processRequestWithWait(req1);
+
+
+ //three read requests after the write
+ processRequestWithWait(createReadRequest(1l, 2));
+ processRequestWithWait(createReadRequest(1l, 3));
+ processRequestWithWait(createReadRequest(1l, 4));
+
+ //commit the write
+ commitWithWait(req1);
+
+ checkMetrics("reads_after_write_in_session_queue", 3l, 3l, 3d, 1, 3);
+ }
+
+
+ @Test
+ public void testReadsQueuedInCommitProcessor() throws Exception {
+ setupProcessors(0, 0);
+ processRequestWithWait(createReadRequest(1l, 1));
+ processRequestWithWait(createReadRequest(1l, 2));
+
+ //recorded reads in the queue are 1, 1
+ checkMetrics("read_commit_proc_req_queued", 1l, 1l, 1d, 2, 2);
+ }
+
+ @Test
+ public void testWritesQueuedInCommitProcessor() throws Exception {
+ setupProcessors(0, 0);
+ Request req1 = createWriteRequest(1l, 1);
+ processRequestWithWait(req1);
+ Request req2 = createWriteRequest(1l, 2);
+ processRequestWithWait(req2);
+
+ //since we haven't got any commit request, the write request stays in
the queue
+ //recorded writes in the queue are 1, 2
+ checkMetrics("write_commit_proc_req_queued", 1l, 2l, 1.5d, 2, 3);
+
+ commitWithWait(req1);
+
+ //recording is done before commit request is processed, so writes in
the queue are: 1, 2, 2
+ checkMetrics("write_commit_proc_req_queued", 1l, 2l, 1.6667d, 3, 5);
+
+ commitWithWait(req2);
+ //writes in the queue are 1, 2, 2, 1
+ checkMetrics("write_commit_proc_req_queued", 1l, 2l, 1.5d, 4, 6);
+
+ //send a read request to trigger the recording, this time the write
queue should be empty
+ //writes in the queue are 1, 2, 2, 1, 0
+ processRequestWithWait(createReadRequest(1l, 1));
+
+ checkMetrics("write_commit_proc_req_queued", 0l, 2l, 1.2d, 5, 6);
+ }
+
+ @Test
+ public void testCommitsQueuedInCommitProcessor() throws Exception {
+ setupProcessors(0, 0);
+
+ commitWithWait(createWriteRequest(1l, 1));
+ commitWithWait(createWriteRequest(1l, 2));
+
+ //recorded commits in the queue are 1, 1
+ checkMetrics("commit_commit_proc_req_queued", 1l, 1l, 1d, 2, 2);
+ }
+
+ @Test
+ public void testCommitsQueued() throws Exception {
+ setupProcessors(0, 0);
+
+ commitWithWait(createWriteRequest(1l, 1));
+ commitWithWait(createWriteRequest(1l, 2));
+
+ Map<String, Object> values = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(2l, (long)values.get("request_commit_queued"));
+ }
+
+ @Test
+ public void testPendingSessionQueueSize() throws Exception {
+ setupProcessors(0, 0);
+
+ //one write request for session 1
+ Request req1 = createWriteRequest(1l, 1);
+ processRequestWithWait(req1);
+
+ //two write requests for session 2
+ Request req2 = createWriteRequest(2l, 2);
+ processRequestWithWait(req2);
+ Request req3 = createWriteRequest(2l, 3);
+ processRequestWithWait(req3);
+
+ commitWithWait(req1);
+ //there are two sessions with pending requests
+ checkMetrics("pending_session_queue_size", 2l, 2l, 2d, 1, 2);
+
+ commitWithWait(req2);
+ //there is on session with pending requests
+ checkMetrics("pending_session_queue_size", 1l, 2l, 1.5d, 2, 3);
+
+ commitWithWait(req3);
+ //there is one session with pending requests
+ checkMetrics("pending_session_queue_size", 1l, 2l, 1.333d, 3, 4);
+ }
+}