anmolnar commented on a change in pull request #876: Zookeeper 3020 2
URL: https://github.com/apache/zookeeper/pull/876#discussion_r271270150
 
 

 ##########
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
 ##########
 @@ -102,103 +108,96 @@ public void run() {
 
             // we do this in an attempt to ensure that not all of the servers
             // in the ensemble take a snapshot at the same time
-            int randRoll = r.nextInt(snapCount/2);
+            int randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2, 
snapCount);
             while (true) {
-                Request si = null;
-                if (toFlush.isEmpty()) {
+                Request si = queuedRequests.poll();
+                if (si == null) {
+                    flush();
                     si = queuedRequests.take();
-                } else {
-                    si = queuedRequests.poll();
-                    if (si == null) {
-                        flush(toFlush);
-                        continue;
-                    }
                 }
-                if (si == requestOfDeath) {
+  
+                if (si == REQUEST_OF_DEATH) {
                     break;
                 }
-                if (si != null) {
-                    // track the number of records written to the log
-                    if (zks.getZKDatabase().append(si)) {
-                        logCount++;
-                        if (logCount > (snapCount / 2 + randRoll)) {
-                            randRoll = r.nextInt(snapCount/2);
-                            // roll the log
-                            zks.getZKDatabase().rollLog();
-                            // take a snapshot
-                            if (snapInProcess != null && 
snapInProcess.isAlive()) {
-                                LOG.warn("Too busy to snap, skipping");
-                            } else {
-                                snapInProcess = new ZooKeeperThread("Snapshot 
Thread") {
-                                        public void run() {
-                                            try {
-                                                zks.takeSnapshot();
-                                            } catch(Exception e) {
-                                                LOG.warn("Unexpected 
exception", e);
-                                            }
-                                        }
-                                    };
-                                snapInProcess.start();
-                            }
-                            logCount = 0;
-                        }
-                    } else if (toFlush.isEmpty()) {
-                        // optimization for read heavy workloads
-                        // iff this is a read, and there are no pending
-                        // flushes (writes), then just pass this to the next
-                        // processor
-                        if (nextProcessor != null) {
-                            nextProcessor.processRequest(si);
-                            if (nextProcessor instanceof Flushable) {
-                                ((Flushable)nextProcessor).flush();
-                            }
+
+                // track the number of records written to the log
+                if (zks.getZKDatabase().append(si)) {
+                    logCount++;
+                    if (logCount > randRoll) {
+                        randRoll = 
ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
+                        // roll the log
+                        zks.getZKDatabase().rollLog();
+                        // take a snapshot
+                        if (!snapThreadMutex.tryAcquire()) {
+                            LOG.warn("Too busy to snap, skipping");
+                        } else {
+                            new ZooKeeperThread("Snapshot Thread") {
+                                public void run() {
+                                    try {
+                                        zks.takeSnapshot();
+                                    } catch (Exception e) {
+                                        LOG.warn("Unexpected exception", e);
+                                    } finally {
+                                      snapThreadMutex.release();
+                                    }
+                                }
+                            }.start();
                         }
-                        continue;
+                        logCount = 0;
                     }
-                    toFlush.add(si);
-                    if (toFlush.size() > 1000) {
-                        flush(toFlush);
+                } else if (toFlush.isEmpty()) {
+                    // optimization for read heavy workloads
+                    // iff this is a read, and there are no pending
+                    // flushes (writes), then just pass this to the next
+                    // processor
+                    if (nextProcessor != null) {
+                        nextProcessor.processRequest(si);
+                        if (nextProcessor instanceof Flushable) {
+                            ((Flushable)nextProcessor).flush();
+                        }
                     }
+                    continue;
+                }
+                toFlush.add(si);
+                if (toFlush.size() == FLUSH_SIZE) {
+                    flush();
                 }
             }
         } catch (Throwable t) {
             handleException(this.getName(), t);
-        } finally{
-            running = false;
         }
         LOG.info("SyncRequestProcessor exited!");
     }
 
-    private void flush(LinkedList<Request> toFlush)
-        throws IOException, RequestProcessorException
-    {
-        if (toFlush.isEmpty())
-            return;
-
-        zks.getZKDatabase().commit();
-        while (!toFlush.isEmpty()) {
-            Request i = toFlush.remove();
-            if (nextProcessor != null) {
-                nextProcessor.processRequest(i);
-            }
-        }
-        if (nextProcessor != null && nextProcessor instanceof Flushable) {
-            ((Flushable)nextProcessor).flush();
-        }
+    private void flush() throws IOException, RequestProcessorException {
+      if (this.toFlush.isEmpty()) {
+          return;
+      }
+
+      zks.getZKDatabase().commit();
+
+      if (this.nextProcessor == null) {
+        this.toFlush.clear();
+      } else {
+          while (!this.toFlush.isEmpty()) {
+              final Request i = this.toFlush.remove();
+              this.nextProcessor.processRequest(i);
+          }
+          if (this.nextProcessor instanceof Flushable) {
+              ((Flushable)this.nextProcessor).flush();
+          } 
+      }
     }
 
     public void shutdown() {
         LOG.info("Shutting down");
-        queuedRequests.add(requestOfDeath);
+        queuedRequests.add(REQUEST_OF_DEATH);
         try {
-            if(running){
-                this.join();
-            }
-            if (!toFlush.isEmpty()) {
-                flush(toFlush);
-            }
-        } catch(InterruptedException e) {
+            this.join();
+            this.flush();
+        } catch (InterruptedException e) {
             LOG.warn("Interrupted while wating for " + this + " to finish");
 
 Review comment:
   Nit: ```LOG.warn("Interrupted while wating for {} to finish", this);```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to