This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 876aaf4 ZOOKEEPER-3020: Review of SyncRequestProcessor
876aaf4 is described below
commit 876aaf42ea721776b82633bb769f8813e881a444
Author: Beluga Behr <[email protected]>
AuthorDate: Tue Apr 30 18:08:06 2019 +0200
ZOOKEEPER-3020: Review of SyncRequestProcessor
1. Use ArrayDeque instead of LinkedList
2. Use ThreadLocalRandom instead of Random
3. Remove the 'running' flag - use the Thread#join facility to detect if
the thread has stopped running. Using a flag can cause race condition issues
and is superfluous.
4. Make static final variable names in all caps
5. General cleanup
> This class is likely to be faster than Stack when used as a stack, and
faster than LinkedList when used as a queue.
https://docs.oracle.com/javase/7/docs/api/java/util/ArrayDeque.html
Author: Beluga Behr <[email protected]>
Reviewers: [email protected]
Closes #876 from BELUGABEHR/ZOOKEEPER-3020-2 and squashes the following
commits:
8f1df370a [Beluga Behr] General cleanup
83cab7382 [Beluga Behr] Simplify and streamline access pattern of
queuedRequests
5e0b801c8 [Beluga Behr] Use a semaphore to track status of snapshot thread
d119ef740 [Beluga Behr] No need to keep a flag which states if the thread
is alive
e2059cf6e [Beluga Behr] Replace LinkedList with ArrayDeque
57a044902 [Beluga Behr] Replace Random with ThreadLocalRandom
17bd1fa18 [Beluga Behr] Require that the request not be 'null'
b28ff9479 [Beluga Behr] Refactor flush method
---
.../zookeeper/server/SyncRequestProcessor.java | 185 ++++++++++-----------
1 file changed, 92 insertions(+), 93 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 57cca9c..9d661c4 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -20,9 +20,13 @@ package org.apache.zookeeper.server;
import java.io.Flushable;
import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Random;
+import java.util.ArrayDeque;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,28 +50,31 @@ import org.slf4j.LoggerFactory;
*/
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
RequestProcessor {
+
private static final Logger LOG =
LoggerFactory.getLogger(SyncRequestProcessor.class);
- private final ZooKeeperServer zks;
- private final LinkedBlockingQueue<Request> queuedRequests =
+
+ private static final int FLUSH_SIZE = 1000;
+
+ private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
+
+ /** The number of log entries to log before starting a snapshot */
+ private static int snapCount = ZooKeeperServer.getSnapCount();
+
+ private final BlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
- private final RequestProcessor nextProcessor;
- private Thread snapInProcess = null;
- volatile private boolean running;
+ private final Semaphore snapThreadMutex = new Semaphore(1);
+
+ private final ZooKeeperServer zks;
+
+ private final RequestProcessor nextProcessor;
/**
* Transactions that have been written and are waiting to be flushed to
* disk. Basically this is the list of SyncItems whose callbacks will be
* invoked after flush returns successfully.
*/
- private final LinkedList<Request> toFlush = new LinkedList<Request>();
- private final Random r = new Random();
- /**
- * The number of log entries to log before starting a snapshot
- */
- private static int snapCount = ZooKeeperServer.getSnapCount();
-
- private final Request requestOfDeath = Request.requestOfDeath;
+ private final Queue<Request> toFlush = new ArrayDeque<>(FLUSH_SIZE);
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
@@ -75,7 +82,6 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
- running = true;
}
/**
@@ -102,103 +108,96 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
- int randRoll = r.nextInt(snapCount/2);
+ int randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2,
snapCount);
while (true) {
- Request si = null;
- if (toFlush.isEmpty()) {
+ Request si = queuedRequests.poll();
+ if (si == null) {
+ flush();
si = queuedRequests.take();
- } else {
- si = queuedRequests.poll();
- if (si == null) {
- flush(toFlush);
- continue;
- }
}
- if (si == requestOfDeath) {
+
+ if (si == REQUEST_OF_DEATH) {
break;
}
- if (si != null) {
- // track the number of records written to the log
- if (zks.getZKDatabase().append(si)) {
- logCount++;
- if (logCount > (snapCount / 2 + randRoll)) {
- randRoll = r.nextInt(snapCount/2);
- // roll the log
- zks.getZKDatabase().rollLog();
- // take a snapshot
- if (snapInProcess != null &&
snapInProcess.isAlive()) {
- LOG.warn("Too busy to snap, skipping");
- } else {
- snapInProcess = new ZooKeeperThread("Snapshot
Thread") {
- public void run() {
- try {
- zks.takeSnapshot();
- } catch(Exception e) {
- LOG.warn("Unexpected
exception", e);
- }
- }
- };
- snapInProcess.start();
- }
- logCount = 0;
- }
- } else if (toFlush.isEmpty()) {
- // optimization for read heavy workloads
- // iff this is a read, and there are no pending
- // flushes (writes), then just pass this to the next
- // processor
- if (nextProcessor != null) {
- nextProcessor.processRequest(si);
- if (nextProcessor instanceof Flushable) {
- ((Flushable)nextProcessor).flush();
- }
+
+ // track the number of records written to the log
+ if (zks.getZKDatabase().append(si)) {
+ logCount++;
+ if (logCount > randRoll) {
+ randRoll =
ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
+ // roll the log
+ zks.getZKDatabase().rollLog();
+ // take a snapshot
+ if (!snapThreadMutex.tryAcquire()) {
+ LOG.warn("Too busy to snap, skipping");
+ } else {
+ new ZooKeeperThread("Snapshot Thread") {
+ public void run() {
+ try {
+ zks.takeSnapshot();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ snapThreadMutex.release();
+ }
+ }
+ }.start();
}
- continue;
+ logCount = 0;
}
- toFlush.add(si);
- if (toFlush.size() > 1000) {
- flush(toFlush);
+ } else if (toFlush.isEmpty()) {
+ // optimization for read heavy workloads
+ // iff this is a read, and there are no pending
+ // flushes (writes), then just pass this to the next
+ // processor
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(si);
+ if (nextProcessor instanceof Flushable) {
+ ((Flushable)nextProcessor).flush();
+ }
}
+ continue;
+ }
+ toFlush.add(si);
+ if (toFlush.size() == FLUSH_SIZE) {
+ flush();
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
- } finally{
- running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
- private void flush(LinkedList<Request> toFlush)
- throws IOException, RequestProcessorException
- {
- if (toFlush.isEmpty())
- return;
-
- zks.getZKDatabase().commit();
- while (!toFlush.isEmpty()) {
- Request i = toFlush.remove();
- if (nextProcessor != null) {
- nextProcessor.processRequest(i);
- }
- }
- if (nextProcessor != null && nextProcessor instanceof Flushable) {
- ((Flushable)nextProcessor).flush();
- }
+ private void flush() throws IOException, RequestProcessorException {
+ if (this.toFlush.isEmpty()) {
+ return;
+ }
+
+ zks.getZKDatabase().commit();
+
+ if (this.nextProcessor == null) {
+ this.toFlush.clear();
+ } else {
+ while (!this.toFlush.isEmpty()) {
+ final Request i = this.toFlush.remove();
+ this.nextProcessor.processRequest(i);
+ }
+ if (this.nextProcessor instanceof Flushable) {
+ ((Flushable)this.nextProcessor).flush();
+ }
+ }
}
public void shutdown() {
LOG.info("Shutting down");
- queuedRequests.add(requestOfDeath);
+ queuedRequests.add(REQUEST_OF_DEATH);
try {
- if(running){
- this.join();
- }
- if (!toFlush.isEmpty()) {
- flush(toFlush);
- }
- } catch(InterruptedException e) {
+ this.join();
+ this.flush();
+ } catch (InterruptedException e) {
LOG.warn("Interrupted while wating for " + this + " to finish");
+ Thread.currentThread().interrupt();
} catch (IOException e) {
LOG.warn("Got IO exception during shutdown");
} catch (RequestProcessorException e) {
@@ -209,8 +208,8 @@ public class SyncRequestProcessor extends
ZooKeeperCriticalThread implements
}
}
- public void processRequest(Request request) {
- // request.addRQRec(">sync");
+ public void processRequest(final Request request) {
+ Objects.requireNonNull(request, "Request cannot be null");
queuedRequests.add(request);
}