Author: phunt
Date: Fri Dec 7 23:38:00 2012
New Revision: 1418555
URL: http://svn.apache.org/viewvc?rev=1418555&view=rev
Log:
ZOOKEEPER-1505. Multi-thread CommitProcessor (Jay Shrauner via phunt)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/WorkerService.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
Modified: zookeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1418555&r1=1418554&r2=1418555&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Dec 7 23:38:00 2012
@@ -403,6 +403,9 @@ IMPROVEMENTS:
ZOOKEEPER-1238. Linger time should be -1 for Netty sockets. (Skye
W-M via henryr)
+ ZOOKEEPER-1505. Multi-thread CommitProcessor (Jay Shrauner via phunt)
+
+
Release 3.4.0 -
Non-backward compatible changes:
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1418555&r1=1418554&r2=1418555&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
Fri Dec 7 23:38:00 2012
@@ -95,22 +95,22 @@ public class FinalRequestProcessor imple
}
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
- while (!zks.outstandingChanges.isEmpty()
- && zks.outstandingChanges.get(0).zxid <= request.zxid) {
- ChangeRecord cr = zks.outstandingChanges.remove(0);
- if (cr.zxid < request.zxid) {
- LOG.warn("Zxid outstanding "
- + cr.zxid
- + " is less than current " + request.zxid);
- }
- if (zks.outstandingChangesForPath.get(cr.path) == cr) {
- zks.outstandingChangesForPath.remove(cr.path);
- }
- }
if (request.getHdr() != null) {
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
-
+ long zxid = hdr.getZxid();
+ while (!zks.outstandingChanges.isEmpty()
+ && zks.outstandingChanges.get(0).zxid <= zxid) {
+ ChangeRecord cr = zks.outstandingChanges.remove(0);
+ if (cr.zxid < zxid) {
+ LOG.warn("Zxid outstanding " + cr.zxid
+ + " is less than current " + zxid);
+ }
+ if (zks.outstandingChangesForPath.get(cr.path) == cr) {
+ zks.outstandingChangesForPath.remove(cr.path);
+ }
+ }
+
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java?rev=1418555&r1=1418554&r2=1418555&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
Fri Dec 7 23:38:00 2012
@@ -21,8 +21,8 @@ package org.apache.zookeeper.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
@@ -128,7 +128,8 @@ public abstract class ServerCnxnFactory
public abstract InetSocketAddress getLocalAddress();
- private final HashMap<ServerCnxn, ConnectionBean> connectionBeans = new
HashMap<ServerCnxn, ConnectionBean>();
+ private final ConcurrentHashMap<ServerCnxn, ConnectionBean>
connectionBeans =
+ new ConcurrentHashMap<ServerCnxn, ConnectionBean>();
protected final HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();
public void unregisterConnection(ServerCnxn serverCnxn) {
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/WorkerService.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/WorkerService.java?rev=1418555&view=auto
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/WorkerService.java
(added)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/WorkerService.java
Fri Dec 7 23:38:00 2012
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * WorkerService is a worker thread pool for running tasks and is implemented
+ * using one or more ExecutorServices. A WorkerService can support assignable
+ * threads, which it does by creating N separate single thread
ExecutorServices,
+ * or non-assignable threads, which it does by creating a single N-thread
+ * ExecutorService.
+ * - NIOServerCnxnFactory uses a non-assignable WorkerService because the
+ * socket IO requests are order independent and allowing the
+ * ExecutorService to handle thread assignment gives optimal performance.
+ * - CommitProcessor uses an assignable WorkerService because requests for
+ * a given session must be processed in order.
+ * ExecutorService provides queue management and thread restarting, so it's
+ * useful even with a single thread.
+ */
+public class WorkerService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WorkerService.class);
+
+ private final ArrayList<ExecutorService> workers =
+ new ArrayList<ExecutorService>();
+
+ private final String threadNamePrefix;
+ private int numWorkerThreads;
+ private boolean threadsAreAssignable;
+ private long shutdownTimeoutMS = 5000;
+
+ private volatile boolean stopped = true;
+
+ /**
+ * @param name worker threads are named <name>Thread-##
+ * @param numThreads number of worker threads (0 - N)
+ * If 0, scheduled work is run immediately by
+ * the calling thread.
+ * @param useAssignableThreads whether the worker threads should be
+ * individually assignable or not
+ */
+ public WorkerService(String name, int numThreads,
+ boolean useAssignableThreads) {
+ this.threadNamePrefix = (name == null ? "" : name) + "Thread";
+ this.numWorkerThreads = numThreads;
+ this.threadsAreAssignable = useAssignableThreads;
+ start();
+ }
+
+ /**
+ * Callers should implement a class extending WorkRequest in order to
+ * schedule work with the service.
+ */
+ public static abstract class WorkRequest {
+ /**
+ * Must be implemented. Is called when the work request is run.
+ */
+ public abstract void doWork() throws Exception;
+
+ /**
+ * (Optional) If implemented, is called if the service is stopped
+ * or unable to schedule the request.
+ */
+ public void cleanup() {
+ }
+ }
+
+ /**
+ * Schedule work to be done. If a worker thread pool is not being
+ * used, work is done directly by this thread. This schedule API is
+ * for use with non-assignable WorkerServices. For assignable
+ * WorkerServices, will always run on the first thread.
+ */
+ public void schedule(WorkRequest workRequest) {
+ schedule(workRequest, 0);
+ }
+
+ /**
+ * Schedule work to be done by the thread assigned to this id. Thread
+ * assignment is a single mod operation on the number of threads. If a
+ * worker thread pool is not being used, work is done directly by
+ * this thread.
+ */
+ public void schedule(WorkRequest workRequest, long id) {
+ if (stopped) {
+ workRequest.cleanup();
+ return;
+ }
+
+ ScheduledWorkRequest scheduledWorkRequest =
+ new ScheduledWorkRequest(workRequest);
+
+ // If we have a worker thread pool, use that; otherwise, do the work
+ // directly.
+ int size = workers.size();
+ if (size > 0) {
+ try {
+ // make sure to map negative ids as well to [0, size-1]
+ int workerNum = ((int) (id % size) + size) % size;
+ ExecutorService worker = workers.get(workerNum);
+ worker.execute(scheduledWorkRequest);
+ } catch (RejectedExecutionException e) {
+ LOG.warn("ExecutorService rejected execution", e);
+ workRequest.cleanup();
+ }
+ } else {
+ scheduledWorkRequest.run();
+ }
+ }
+
+ private class ScheduledWorkRequest implements Runnable {
+ private final WorkRequest workRequest;
+
+ ScheduledWorkRequest(WorkRequest workRequest) {
+ this.workRequest = workRequest;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Check if stopped while request was on queue
+ if (stopped) {
+ workRequest.cleanup();
+ return;
+ }
+ workRequest.doWork();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ workRequest.cleanup();
+ }
+ }
+ }
+
+ /**
+ * ThreadFactory for the worker thread pool. We don't use the default
+ * thread factory because (1) we want to give the worker threads easier
+ * to identify names; and (2) we want to make the worker threads daemon
+ * threads so they don't block the server from shutting down.
+ */
+ private static class DaemonThreadFactory implements ThreadFactory {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix;
+
+ DaemonThreadFactory(String name) {
+ this(name, 1);
+ }
+
+ DaemonThreadFactory(String name, int firstThreadNum) {
+ threadNumber.set(firstThreadNum);
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null)? s.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+ namePrefix = name + "-";
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r,
+ namePrefix + threadNumber.getAndIncrement(),
+ 0);
+ if (!t.isDaemon())
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ }
+
+ public void start() {
+ if (numWorkerThreads > 0) {
+ if (threadsAreAssignable) {
+ for(int i = 1; i <= numWorkerThreads; ++i) {
+ workers.add(Executors.newFixedThreadPool(
+ 1, new DaemonThreadFactory(threadNamePrefix, i)));
+ }
+ } else {
+ workers.add(Executors.newFixedThreadPool(
+ numWorkerThreads, new
DaemonThreadFactory(threadNamePrefix)));
+ }
+ }
+ stopped = false;
+ }
+
+ public void stop() {
+ stopped = true;
+
+ // Signal for graceful shutdown
+ for(ExecutorService worker : workers) {
+ worker.shutdown();
+ }
+ }
+
+ public void join(long shutdownTimeoutMS) {
+ // Give the worker threads time to finish executing
+ long now = System.currentTimeMillis();
+ long endTime = now + shutdownTimeoutMS;
+ for(ExecutorService worker : workers) {
+ boolean terminated = false;
+ while ((now = System.currentTimeMillis()) <= endTime) {
+ try {
+ terminated = worker.awaitTermination(
+ endTime - now, TimeUnit.MILLISECONDS);
+ break;
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ if (!terminated) {
+ // If we've timed out, do a hard shutdown
+ worker.shutdownNow();
+ }
+ }
+ }
+}
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1418555&r1=1418554&r2=1418555&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
Fri Dec 7 23:38:00 2012
@@ -18,36 +18,88 @@
package org.apache.zookeeper.server.quorum;
-import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.WorkerService;
/**
* This RequestProcessor matches the incoming committed requests with the
* locally submitted requests. The trick is that locally submitted requests
that
* change the state of the system will come back as incoming committed
requests,
* so we need to match them up.
+ *
+ * The CommitProcessor is multi-threaded. Communication between threads is
+ * handled via queues, atomics, and wait/notifyAll synchronized on the
+ * processor. The CommitProcessor acts as a gateway for allowing requests to
+ * continue with the remainder of the processing pipeline. It will allow many
+ * read requests but only a single write request to be in flight
simultaneously,
+ * thus ensuring that write requests are processed in transaction id order.
+ *
+ * - 1 commit processor main thread, which watches the request queues and
+ * assigns requests to worker threads based on their sessionId so that
+ * read and write requests for a particular session are always assigned
+ * to the same thread (and hence are guaranteed to run in order).
+ * - 0-N worker threads, which run the rest of the request processor pipeline
+ * on the requests. If configured with 0 worker threads, the primary
+ * commit processor thread runs the pipeline directly.
+ *
+ * Typical (default) thread counts are: on a 32 core machine, 1 commit
+ * processor thread and 32 worker threads.
+ *
+ * Multi-threading constraints:
+ * - Each session's requests must be processed in order.
+ * - Write requests must be processed in zxid order
+ * - Must ensure no race condition between writes in one session that would
+ * trigger a watch being set by a read request in another session
+ *
+ * The current implementation solves the third constraint by simply allowing no
+ * read requests to be processed in parallel with write requests.
*/
public class CommitProcessor extends Thread implements RequestProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(CommitProcessor.class);
+ /** Default: numCores */
+ public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS =
+ "zookeeper.commitProcessor.numWorkerThreads";
+ /** Default worker pool shutdown timeout in ms: 5000 (5s) */
+ public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
+ "zookeeper.commitProcessor.shutdownTimeout";
+
/**
* Requests that we are holding until the commit comes in.
*/
- LinkedList<Request> queuedRequests = new LinkedList<Request>();
+ private final LinkedBlockingQueue<Request> queuedRequests =
+ new LinkedBlockingQueue<Request>();
/**
* Requests that have been committed.
*/
- LinkedList<Request> committedRequests = new LinkedList<Request>();
+ private final LinkedBlockingQueue<Request> committedRequests =
+ new LinkedBlockingQueue<Request>();
+
+ /** Request for which we are currently awaiting a commit */
+ private final AtomicReference<Request> nextPending =
+ new AtomicReference<Request>();
+ /** Request currently being committed (ie, sent off to next processor) */
+ private final AtomicReference<Request> currentlyCommitting =
+ new AtomicReference<Request>();
+
+ /** The number of requests currently being processed */
+ private AtomicInteger numRequestsProcessing = new AtomicInteger(0);
RequestProcessor nextProcessor;
+ private volatile boolean stopped = true;
+ private long workerShutdownTimeoutMS;
+ private WorkerService workerPool;
+
/**
* This flag indicates whether we need to wait for a response to come back
from the
* leader or we just let the sync operation flow through like a read. The
flag will
@@ -55,87 +107,107 @@ public class CommitProcessor extends Thr
*/
boolean matchSyncs;
- public CommitProcessor(RequestProcessor nextProcessor, String id, boolean
matchSyncs) {
+ public CommitProcessor(RequestProcessor nextProcessor, String id,
+ boolean matchSyncs) {
super("CommitProcessor:" + id);
this.nextProcessor = nextProcessor;
this.matchSyncs = matchSyncs;
}
- volatile boolean finished = false;
+ private boolean isProcessingRequest() {
+ return numRequestsProcessing.get() != 0;
+ }
+
+ private boolean isWaitingForCommit() {
+ return nextPending.get() != null;
+ }
+
+ private boolean isProcessingCommit() {
+ return currentlyCommitting.get() != null;
+ }
+
+ protected boolean needCommit(Request request) {
+ switch (request.type) {
+ case OpCode.create:
+ case OpCode.delete:
+ case OpCode.setData:
+ case OpCode.multi:
+ case OpCode.setACL:
+ case OpCode.createSession:
+ case OpCode.closeSession:
+ return true;
+ case OpCode.sync:
+ return matchSyncs;
+ default:
+ return false;
+ }
+ }
@Override
public void run() {
- ArrayList<Request> toProcess = new ArrayList<Request>();
+ Request request;
try {
- Request nextPending = null;
- while (!finished) {
- for (Request request : toProcess) {
- nextProcessor.processRequest(request);
- }
- toProcess.clear();
- synchronized (this) {
- if ((queuedRequests.isEmpty() || nextPending != null) &&
committedRequests.isEmpty()) {
+ while (!stopped) {
+ synchronized(this) {
+ while (
+ !stopped &&
+ ((queuedRequests.isEmpty() || isWaitingForCommit() ||
isProcessingCommit()) &&
+ (committedRequests.isEmpty() ||
isProcessingRequest()))) {
wait();
- continue;
- }
- // First check and see if the commit came in for the
pending request
- if ((queuedRequests.isEmpty() || nextPending != null) &&
!committedRequests.isEmpty()) {
- Request r = committedRequests.remove();
- /*
- * We match with nextPending so that we can move to the
- * next request when it is committed. We also want to
- * use nextPending because it has the cnxn member set
- * properly.
- */
- if (nextPending != null
- && nextPending.sessionId == r.sessionId
- && nextPending.cxid == r.cxid) {
- // we want to send our version of the request.
- // the pointer to the connection in the request
- nextPending.setHdr(r.getHdr());
- nextPending.setTxn(r.getTxn());
- nextPending.zxid = r.zxid;
- toProcess.add(nextPending);
- nextPending = null;
- } else {
- // this request came from someone else so just
send the commit packet
- toProcess.add(r);
- }
}
}
- // We haven't matched the pending requests, so go back to
waiting
- if (nextPending != null) {
- continue;
+ /*
+ * Processing queuedRequests: Process the next requests until
we
+ * find one for which we need to wait for a commit. We cannot
+ * process a read request while we are processing write
request.
+ */
+ while (!stopped && !isWaitingForCommit() &&
+ !isProcessingCommit() &&
+ (request = queuedRequests.poll()) != null) {
+ if (needCommit(request)) {
+ nextPending.set(request);
+ } else {
+ sendToNextProcessor(request);
+ }
}
- synchronized (this) {
- // Process the next requests in the queuedRequests
- while (nextPending == null && !queuedRequests.isEmpty()) {
- Request request = queuedRequests.remove();
- switch (request.type) {
- case OpCode.create:
- case OpCode.delete:
- case OpCode.setData:
- case OpCode.multi:
- case OpCode.check:
- case OpCode.setACL:
- case OpCode.createSession:
- case OpCode.closeSession:
- nextPending = request;
- break;
- case OpCode.sync:
- if (matchSyncs) {
- nextPending = request;
- } else {
- toProcess.add(request);
- }
- break;
- default:
- toProcess.add(request);
- }
+ /*
+ * Processing committedRequests: check and see if the commit
+ * came in for the pending request. We can only commit a
+ * request when there is no other request being processed.
+ */
+ if (!stopped && !isProcessingRequest() &&
+ (request = committedRequests.poll()) != null) {
+ /*
+ * We match with nextPending so that we can move to the
+ * next request when it is committed. We also want to
+ * use nextPending because it has the cnxn member set
+ * properly.
+ */
+ Request pending = nextPending.get();
+ if (pending != null &&
+ pending.sessionId == request.sessionId &&
+ pending.cxid == request.cxid) {
+ // we want to send our version of the request.
+ // the pointer to the connection in the request
+ pending.setHdr(request.getHdr());
+ pending.setTxn(request.getTxn());
+ pending.zxid = request.zxid;
+ // Set currentlyCommitting so we will block until this
+ // completes. Cleared by CommitWorkRequest after
+ // nextProcessor returns.
+ currentlyCommitting.set(pending);
+ nextPending.set(null);
+ sendToNextProcessor(pending);
+ } else {
+ // this request came from someone else so just
+ // send the commit packet
+ currentlyCommitting.set(request);
+ sendToNextProcessor(request);
}
}
+
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
@@ -145,41 +217,129 @@ public class CommitProcessor extends Thr
LOG.info("CommitProcessor exited loop!");
}
- synchronized public void commit(Request request) {
- if (!finished) {
- if (request == null) {
- LOG.warn("Committed a null!",
- new Exception("committing a null! "));
- return;
+ @Override
+ public void start() {
+ int numCores = Runtime.getRuntime().availableProcessors();
+ int numWorkerThreads = Integer.getInteger(
+ ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
+ workerShutdownTimeoutMS = Long.getLong(
+ ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
+
+ LOG.info("Configuring CommitProcessor with "
+ + (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ + " worker threads.");
+ if (workerPool == null) {
+ workerPool = new WorkerService(
+ "CommitProcWork", numWorkerThreads, true);
+ }
+ stopped = false;
+ super.start();
+ }
+
+ /**
+ * Schedule final request processing; if a worker thread pool is not being
+ * used, processing is done directly by this thread.
+ */
+ private void sendToNextProcessor(Request request) {
+ numRequestsProcessing.incrementAndGet();
+ workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
+ }
+
+ /**
+ * CommitWorkRequest is a small wrapper class to allow
+ * downstream processing to be run using the WorkerService
+ */
+ private class CommitWorkRequest extends WorkerService.WorkRequest {
+ private final Request request;
+
+ CommitWorkRequest(Request request) {
+ this.request = request;
+ }
+
+ @Override
+ public void cleanup() {
+ if (!stopped) {
+ LOG.error("Exception thrown by downstream processor,"
+ + " unable to continue.");
+ CommitProcessor.this.halt();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing request:: " + request);
+ }
+
+ public void doWork() throws RequestProcessorException {
+ try {
+ nextProcessor.processRequest(request);
+ } finally {
+ // If this request is the commit request that was blocking
+ // the processor, clear.
+ currentlyCommitting.compareAndSet(request, null);
+
+ /*
+ * Decrement outstanding request count. The processor may be
+ * blocked at the moment because it is waiting for the pipeline
+ * to drain. In that case, wake it up if there are pending
+ * requests.
+ */
+ if (numRequestsProcessing.decrementAndGet() == 0) {
+ if (!queuedRequests.isEmpty() ||
+ !committedRequests.isEmpty()) {
+ wakeup();
+ }
+ }
}
- committedRequests.add(request);
- notifyAll();
}
}
- synchronized public void processRequest(Request request) {
+ synchronized private void wakeup() {
+ notifyAll();
+ }
+
+ public void commit(Request request) {
+ if (stopped || request == null) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing request:: " + request);
+ }
+ committedRequests.add(request);
+ if (!isProcessingCommit()) {
+ wakeup();
+ }
+ }
+
+ public void processRequest(Request request) {
+ if (stopped) {
+ return;
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
+ queuedRequests.add(request);
+ if (!isWaitingForCommit()) {
+ wakeup();
+ }
+ }
- if (!finished) {
- queuedRequests.add(request);
- notifyAll();
+ private void halt() {
+ stopped = true;
+ wakeup();
+ queuedRequests.clear();
+ if (workerPool != null) {
+ workerPool.stop();
}
}
public void shutdown() {
LOG.info("Shutting down");
- synchronized (this) {
- finished = true;
- queuedRequests.clear();
- notifyAll();
+
+ halt();
+
+ if (workerPool != null) {
+ workerPool.join(workerShutdownTimeoutMS);
}
+
if (nextProcessor != null) {
nextProcessor.shutdown();
}
}
+
}
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1418555&r1=1418554&r2=1418555&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Fri Dec 7 23:38:00 2012
@@ -413,7 +413,7 @@ public class Leader {
Thread.sleep(self.tickTime);
self.tick++;
}
-
+
/**
* WARNING: do not use this for anything other than QA testing
* on a real cluster. Specifically to enable verification that
quorum
@@ -421,7 +421,7 @@ public class Leader {
* ZOOKEEPER-1277. Without this option it would take a very long
* time (on order of a month say) to see the 4 billion writes
* necessary to cause the roll-over to occur.
- *
+ *
* This field allows you to override the zxid of the server.
Typically
* you'll want to set it to something like 0xfffffff0 and then
* start the quorum, run some operations and see the re-election.
@@ -638,9 +638,23 @@ public class Leader {
*/
public void processRequest(Request request) throws
RequestProcessorException {
next.processRequest(request);
- Proposal p = leader.toBeApplied.peek();
- if (p != null && p.request != null && p.request.zxid ==
request.zxid) {
- leader.toBeApplied.remove();
+
+ // The only requests that should be on toBeApplied are write
+ // requests, for which we will have a hdr. We can't simply use
+ // request.zxid here because that is set on read requests to equal
+ // the zxid of the last write op.
+ if (request.getHdr() != null) {
+ long zxid = request.getHdr().getZxid();
+ Iterator<Proposal> iter = leader.toBeApplied.iterator();
+ if (iter.hasNext()) {
+ Proposal p = iter.next();
+ if (p.request != null && p.request.zxid == zxid) {
+ iter.remove();
+ return;
+ }
+ }
+ LOG.error("Committed request not found on toBeApplied: "
+ + request);
}
}
@@ -715,7 +729,7 @@ public class Leader {
public long getEpoch(){
return ZxidUtils.getEpochFromZxid(lastProposed);
}
-
+
@SuppressWarnings("serial")
public static class XidRolloverException extends Exception {
public XidRolloverException(String message) {
Modified: zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml?rev=1418555&r1=1418554&r2=1418555&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml (original)
+++ zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml Fri Dec 7
23:38:00 2012
@@ -131,4 +131,12 @@
<Method name="writeLongToFile" />
</Match>
+ <!-- this is a helper routine to wakeup the main thread with the
+ state change happening in the routines that call it -->
+ <Match>
+ <Class name="org.apache.zookeeper.server.quorum.CommitProcessor"/>
+ <Bug pattern="NN_NAKED_NOTIFY" />
+ <Method name="wakeup" />
+ </Match>
+
</FindBugsFilter>
Added:
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java?rev=1418555&view=auto
==============================================================================
---
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
(added)
+++
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
Fri Dec 7 23:38:00 2012
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.SessionTracker;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.quorum.CommitProcessor;
+import org.apache.zookeeper.test.ClientBase;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The following are invariant regardless of the particular implementation
+ * of the CommitProcessor, and are tested for:
+ *
+ * 1. For each session, requests are processed and the client sees its
+ * responses in order.
+ * 2. Write requests are processed in zxid order across all sessions.
+ *
+ * The following are also tested for here, but are specific to this
+ * particular implementation. The underlying issue is that watches can be
+ * reset while reading the data. For reads/writes on two different sessions
+ * on different nodes, or with reads that do not set watches, the reads can
+ * happen in any order relative to the writes. For a read in one session that
+ * resets a watch that is triggered by a write on another session, however,
+ * we need to ensure that there is no race condition
+ *
+ * 3. The pipeline needs to be drained before a write request can enter.
+ * 4. No in-flight write requests while processing a read request.
+ */
+public class CommitProcessorTest {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(CommitProcessorTest.class);
+
+
+ private AtomicInteger processedReadRequests = new AtomicInteger(0);
+ private AtomicInteger processedWriteRequests = new AtomicInteger(0);
+
+ TestZooKeeperServer zks;
+ File tmpDir;
+ ArrayList<TestClientThread> testClients =
+ new ArrayList<TestClientThread>();
+
+ public void setUp(int numCommitThreads, int numClientThreads)
+ throws Exception {
+ System.setProperty(
+ CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS,
+ Integer.toString(numCommitThreads));
+ tmpDir = ClientBase.createTmpDir();
+ ClientBase.setupTestEnv();
+ zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
+ zks.startup();
+ for(int i=0; i<numClientThreads; ++i) {
+ TestClientThread client = new TestClientThread();
+ testClients.add(client);
+ client.start();
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("tearDown starting");
+ for(TestClientThread client : testClients) {
+ client.interrupt();
+ client.join();
+ }
+ zks.shutdown();
+
+ if (tmpDir != null) {
+ Assert.assertTrue("delete " + tmpDir.toString(),
+ ClientBase.recursiveDelete(tmpDir));
+ }
+ }
+
+ private class TestClientThread extends Thread {
+ long sessionId;
+ int cxid;
+ int nodeId;
+
+ public TestClientThread() {
+ sessionId = zks.getSessionTracker().createSession(5000);
+ }
+
+ public void sendWriteRequest() throws Exception {
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+ CreateRequest createReq = new CreateRequest(
+ "/session" + Long.toHexString(sessionId) + "-" + (++nodeId),
+ new byte[0], Ids.OPEN_ACL_UNSAFE, 1);
+ createReq.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+ Request req = new Request(null, sessionId, ++cxid, OpCode.create,
+ bb, new ArrayList<Id>());
+ zks.firstProcessor.processRequest(req);
+ }
+
+ public void sendReadRequest() throws Exception {
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+ GetDataRequest getDataRequest = new GetDataRequest(
+ "/session" + Long.toHexString(sessionId) + "-" + nodeId,
false);
+ getDataRequest.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+ Request req = new Request(null, sessionId, ++cxid, OpCode.getData,
+ bb, new ArrayList<Id>());
+ zks.firstProcessor.processRequest(req);
+ }
+
+ public void run() {
+ Random rand = new Random(Thread.currentThread().getId());
+ try {
+ sendWriteRequest();
+ for(int i=0; i<1000; ++i) {
+ // Do 25% write / 75% read request mix
+ if (rand.nextInt(100) < 25) {
+ sendWriteRequest();
+ } else {
+ sendReadRequest();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Uncaught exception in test: ", e);
+ }
+ }
+ }
+
+ @Test
+ public void testNoCommitWorkers() throws Exception {
+ setUp(0, 10);
+ synchronized(this) {
+ wait(5000);
+ }
+ checkProcessedRequest();
+ Assert.assertFalse(fail);
+ }
+
+ @Test
+ public void testOneCommitWorker() throws Exception {
+ setUp(1, 10);
+ synchronized(this) {
+ wait(5000);
+ }
+ checkProcessedRequest();
+ Assert.assertFalse(fail);
+ }
+
+ @Test
+ public void testManyCommitWorkers() throws Exception {
+ setUp(10, 10);
+ synchronized(this) {
+ wait(5000);
+ }
+ checkProcessedRequest();
+ Assert.assertFalse(fail);
+
+ }
+
+ private void checkProcessedRequest() {
+ Assert.assertTrue("No read requests processed",
+ processedReadRequests.get() > 0);
+ Assert.assertTrue("No write requests processed",
+ processedWriteRequests.get() > 0);
+ }
+
+ volatile boolean fail = false;
+ synchronized private void failTest(String reason) {
+ fail = true;
+ notifyAll();
+ Assert.fail(reason);
+ }
+
+ private class TestZooKeeperServer extends ZooKeeperServer {
+ PrepRequestProcessor firstProcessor;
+ CommitProcessor commitProcessor;
+
+ public TestZooKeeperServer(File snapDir, File logDir, int tickTime)
+ throws IOException {
+ super(snapDir, logDir, tickTime);
+ }
+
+ public SessionTracker getSessionTracker() {
+ return sessionTracker;
+ }
+
+ // Leader mock: Prep -> MockProposal -> Commit -> validate -> Final
+ // Have side thread call commitProc.commit()
+ @Override
+ protected void setupRequestProcessors() {
+ RequestProcessor finalProcessor = new FinalRequestProcessor(zks);
+ // ValidateProcessor is set up in a similar fashion to ToBeApplied
+ // processor, so it can do pre/post validating of requests
+ ValidateProcessor validateProcessor =
+ new ValidateProcessor(finalProcessor);
+ commitProcessor = new CommitProcessor(validateProcessor, "1",
true);
+ validateProcessor.setCommitProcessor(commitProcessor);
+ commitProcessor.start();
+ MockProposalRequestProcessor proposalProcessor =
+ new MockProposalRequestProcessor(commitProcessor);
+ proposalProcessor.start();
+ firstProcessor = new PrepRequestProcessor(zks, proposalProcessor);
+ firstProcessor.start();
+ }
+ }
+
+ private class MockProposalRequestProcessor extends Thread
+ implements RequestProcessor {
+ private final CommitProcessor commitProcessor;
+ private final LinkedBlockingQueue<Request> proposals =
+ new LinkedBlockingQueue<Request>();
+
+ public MockProposalRequestProcessor(CommitProcessor commitProcessor) {
+ this.commitProcessor = commitProcessor;
+ }
+
+ @Override
+ public void run() {
+ Random rand = new Random(Thread.currentThread().getId());
+ try {
+ while(true) {
+ Request request = proposals.take();
+ Thread.sleep(10 + rand.nextInt(190));
+ commitProcessor.commit(request);
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ @Override
+ public void processRequest(Request request)
+ throws RequestProcessorException {
+ commitProcessor.processRequest(request);
+ if (request.getHdr() != null) {
+ // fake propose request
+ proposals.add(request);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+ private class ValidateProcessor implements RequestProcessor {
+ Random rand = new Random(Thread.currentThread().getId());
+ RequestProcessor nextProcessor;
+ CommitProcessor commitProcessor;
+ AtomicLong expectedZxid = new AtomicLong(1);
+ ConcurrentHashMap<Long, AtomicInteger> cxidMap =
+ new ConcurrentHashMap<Long, AtomicInteger>();
+
+ AtomicInteger outstandingReadRequests = new AtomicInteger(0);
+ AtomicInteger outstandingWriteRequests = new AtomicInteger(0);
+
+ public ValidateProcessor(RequestProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ public void setCommitProcessor(CommitProcessor commitProcessor) {
+ this.commitProcessor = commitProcessor;
+ }
+
+
+ @Override
+ public void processRequest(Request request)
+ throws RequestProcessorException {
+ boolean isWriteRequest = commitProcessor.needCommit(request);
+ if (isWriteRequest) {
+ outstandingWriteRequests.incrementAndGet();
+ validateWriteRequestVariant(request);
+ LOG.debug("Starting write request zxid=" + request.zxid);
+ } else {
+ LOG.debug("Starting read request cxid="
+ + request.cxid + " for session 0x"
+ + Long.toHexString(request.sessionId));
+ outstandingReadRequests.incrementAndGet();
+ validateReadRequestVariant(request);
+ }
+
+ // Insert random delay to test thread race conditions
+ try {
+ Thread.sleep(10 + rand.nextInt(290));
+ } catch(InterruptedException e) {
+ // ignore
+ }
+ nextProcessor.processRequest(request);
+
+ /*
+ * The commit workers will have to execute this line before they
+ * wake up the commit processor. So this value is up-to-date when
+ * variant check is performed
+ */
+ if (isWriteRequest) {
+ outstandingWriteRequests.decrementAndGet();
+ LOG.debug("Done write request zxid=" + request.zxid);
+ processedWriteRequests.incrementAndGet();
+ } else {
+ outstandingReadRequests.decrementAndGet();
+ LOG.debug("Done read request cxid="
+ + request.cxid + " for session 0x"
+ + Long.toHexString(request.sessionId));
+ processedReadRequests.incrementAndGet();
+ }
+ validateRequest(request);
+ }
+
+ /**
+ * Validate that this is the only request in the pipeline
+ */
+ private void validateWriteRequestVariant(Request request) {
+ long zxid = request.getHdr().getZxid();
+ int readRequests = outstandingReadRequests.get();
+ if (readRequests != 0) {
+ failTest("There are " + readRequests + " outstanding"
+ + " read requests while issuing a write request zxid="
+ + zxid);
+ }
+ int writeRequests = outstandingWriteRequests.get();
+ if (writeRequests > 1) {
+ failTest("There are " + writeRequests + " outstanding"
+ + " write requests while issuing a write request zxid="
+ + zxid + " (expected one)");
+ }
+ }
+
+ /**
+ * Validate that no write request is in the pipeline while working
+ * on a read request
+ */
+ private void validateReadRequestVariant(Request request) {
+ int writeRequests = outstandingWriteRequests.get();
+ if (writeRequests != 0) {
+ failTest("There are " + writeRequests + " outstanding"
+ + " write requests while issuing a read request cxid="
+ + request.cxid + " for session 0x"
+ + Long.toHexString(request.sessionId));
+ }
+ }
+
+ private void validateRequest(Request request) {
+ LOG.info("Got request " + request);
+
+ // Zxids should always be in order for write requests
+ if (request.getHdr() != null) {
+ long zxid = request.getHdr().getZxid();
+ if (!expectedZxid.compareAndSet(zxid, zxid + 1)) {
+ failTest("Write request, expected_zxid="
+ + expectedZxid.get() + "; req_zxid=" + zxid);
+ }
+ }
+
+ // Each session should see its cxids in order
+ AtomicInteger sessionCxid = cxidMap.get(request.sessionId);
+ if (sessionCxid == null) {
+ sessionCxid = new AtomicInteger(request.cxid + 1);
+ AtomicInteger existingSessionCxid =
+ cxidMap.putIfAbsent(request.sessionId, sessionCxid);
+ if (existingSessionCxid != null) {
+ failTest("Race condition adding cxid=" + request.cxid
+ + " for session 0x"
+ + Long.toHexString(request.sessionId)
+ + " with other_cxid=" +
existingSessionCxid.get());
+ }
+ } else {
+ if (!sessionCxid.compareAndSet(
+ request.cxid, request.cxid + 1)) {
+ failTest("Expected_cxid=" + sessionCxid.get()
+ + "; req_cxid=" + request.cxid);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // TODO Auto-generated method stub
+ }
+ }
+
+}