leventov commented on a change in pull request #7088: Improve parallelism of 
zookeeper based segment change processing
URL: https://github.com/apache/incubator-druid/pull/7088#discussion_r279053343
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
 ##########
 @@ -219,199 +220,182 @@ public void unmarkSegmentToDrop(DataSegment 
dataSegment)
     segmentsMarkedToDrop.remove(dataSegment);
   }
 
-  private void processSegmentChangeRequest()
+  private class SegmentChangeProcessor implements Runnable
   {
-    if (currentlyProcessing != null) {
-      log.debug(
-          "Server[%s] skipping processSegmentChangeRequest because something 
is currently loading[%s].",
-          basePath,
-          currentlyProcessing.getSegmentId()
-      );
+    private final SegmentHolder segmentHolder;
 
-      return;
-    }
-
-    if (!segmentsToDrop.isEmpty()) {
-      currentlyProcessing = segmentsToDrop.firstEntry().getValue();
-      log.debug("Server[%s] dropping [%s]", basePath, 
currentlyProcessing.getSegmentId());
-    } else if (!segmentsToLoad.isEmpty()) {
-      currentlyProcessing = segmentsToLoad.firstEntry().getValue();
-      log.debug("Server[%s] loading [%s]", basePath, 
currentlyProcessing.getSegmentId());
-    } else {
-      return;
+    private SegmentChangeProcessor(SegmentHolder segmentHolder)
+    {
+      this.segmentHolder = segmentHolder;
     }
 
-    try {
-      final String path = ZKPaths.makePath(basePath, 
currentlyProcessing.getSegmentId().toString());
-      final byte[] payload = 
jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
-      curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
-
-      processingExecutor.schedule(
-          () -> {
-            try {
-              if (curator.checkExists().forPath(path) != null) {
-                failAssign(new ISE("%s was never removed! Failing this 
operation!", path));
+    @Override
+    public void run()
+    {
+      try {
+        final String path = ZKPaths.makePath(basePath, 
segmentHolder.getSegmentIdentifier());
+        final byte[] payload = 
jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest());
+        curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
+        log.debug(
+            "ZKNode created for server to [%s] %s [%s]",
+            basePath,
+            segmentHolder.getType() == LOAD ? "load" : "drop",
+            segmentHolder.getSegmentIdentifier()
+        );
+        final ScheduledFuture<?> future = monitorNodeRemovedExecutor.schedule(
+            () -> {
+              try {
+                if (curator.checkExists().forPath(path) != null) {
+                  failAssign(segmentHolder, new ISE("%s was never removed! 
Failing this operation!", path));
+                } else {
+                  log.debug("%s detected to be removed. ", path);
+                }
               }
-            }
-            catch (Exception e) {
-              failAssign(e);
-            }
-          },
-          config.getLoadTimeoutDelay().getMillis(),
-          TimeUnit.MILLISECONDS
-      );
+              catch (Exception e) {
+                failAssign(segmentHolder, e);
+              }
+            },
+            config.getLoadTimeoutDelay().getMillis(),
+            TimeUnit.MILLISECONDS
+        );
 
-      final Stat stat = curator.checkExists().usingWatcher(
-          (CuratorWatcher) watchedEvent -> {
-            switch (watchedEvent.getType()) {
-              case NodeDeleted:
-                entryRemoved(watchedEvent.getPath());
-                break;
-              default:
-                // do nothing
+        final Stat stat = curator.checkExists().usingWatcher(
+            (CuratorWatcher) watchedEvent -> {
+              switch (watchedEvent.getType()) {
+                case NodeDeleted:
+                  // Cancel the check node deleted task since we have already
+                  // been notified by the zk watcher
+                  future.cancel(true);
+                  entryRemoved(segmentHolder, watchedEvent.getPath());
+                  break;
+                default:
+                  // do nothing
+              }
             }
-          }
-      ).forPath(path);
-
-      if (stat == null) {
-        final byte[] noopPayload = jsonMapper.writeValueAsBytes(new 
SegmentChangeRequestNoop());
-
-        // Create a node and then delete it to remove the registered watcher.  
This is a work-around for
-        // a zookeeper race condition.  Specifically, when you set a watcher, 
it fires on the next event
-        // that happens for that node.  If no events happen, the watcher stays 
registered foreverz.
-        // Couple that with the fact that you cannot set a watcher when you 
create a node, but what we
-        // want is to create a node and then watch for it to get deleted.  The 
solution is that you *can*
-        // set a watcher when you check to see if it exists so, we first 
create the node and then set a
-        // watcher on its existence.  However, if already does not exist by 
the time the existence check
-        // returns, then the watcher that was set will never fire (nobody will 
ever create the node
-        // again) and thus lead to a slow, but real, memory leak.  So, we 
create another node to cause
-        // that watcher to fire and delete it right away.
-        //
-        // We do not create the existence watcher first, because then it will 
fire when we create the
-        // node and we'll have the same race when trying to refresh that 
watcher.
-        curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, 
noopPayload);
-
-        entryRemoved(path);
+        ).forPath(path);
+
+        if (stat == null) {
+          final byte[] noopPayload = jsonMapper.writeValueAsBytes(new 
SegmentChangeRequestNoop());
+
+          // Create a node and then delete it to remove the registered 
watcher.  This is a work-around for
+          // a zookeeper race condition.  Specifically, when you set a 
watcher, it fires on the next event
+          // that happens for that node.  If no events happen, the watcher 
stays registered foreverz.
+          // Couple that with the fact that you cannot set a watcher when you 
create a node, but what we
+          // want is to create a node and then watch for it to get deleted.  
The solution is that you *can*
+          // set a watcher when you check to see if it exists so, we first 
create the node and then set a
+          // watcher on its existence.  However, if already does not exist by 
the time the existence check
+          // returns, then the watcher that was set will never fire (nobody 
will ever create the node
+          // again) and thus lead to a slow, but real, memory leak.  So, we 
create another node to cause
+          // that watcher to fire and delete it right away.
+          //
+          // We do not create the existence watcher first, because then it 
will fire when we create the
+          // node and we'll have the same race when trying to refresh that 
watcher.
+          curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, 
noopPayload);
+          entryRemoved(segmentHolder, path);
+        }
+      }
+      catch (KeeperException.NodeExistsException ne) {
+        // This is expected when historicals haven't yet picked up processing 
this segment and coordinator
+        // tries reassigning it to the same node.
+        log.warn(ne, "ZK node already exists because segment change request 
hasn't yet been processed");
+        failAssign(segmentHolder);
+      }
+      catch (Exception e) {
+        failAssign(segmentHolder, e);
       }
-    }
-    catch (Exception e) {
-      failAssign(e);
     }
   }
 
-  private void actionCompleted()
+  private void actionCompleted(SegmentHolder segmentHolder)
   {
-    if (currentlyProcessing != null) {
-      switch (currentlyProcessing.getType()) {
-        case LOAD:
-          segmentsToLoad.remove(currentlyProcessing.getSegment());
-          queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
-          break;
-        case DROP:
-          segmentsToDrop.remove(currentlyProcessing.getSegment());
-          break;
-        default:
-          throw new UnsupportedOperationException();
-      }
-
-      final List<LoadPeonCallback> callbacks = 
currentlyProcessing.getCallbacks();
-      currentlyProcessing = null;
-      callBackExecutor.execute(
-          () -> executeCallbacks(callbacks)
-      );
+    switch (segmentHolder.getType()) {
+      case LOAD:
+        segmentsToLoad.remove(segmentHolder.getSegment());
+        queuedSize.addAndGet(-segmentHolder.getSegmentSize());
+        break;
+      case DROP:
+        segmentsToDrop.remove(segmentHolder.getSegment());
+        break;
+      default:
+        throw new UnsupportedOperationException();
     }
+
+    callBackExecutor.execute(
+        () -> executeCallbacks(segmentHolder)
 
 Review comment:
   Pointless two level submission. `executeCallbacks()` submits each callback 
to itself another one time (this means that it's valuable to annotate each 
method in this class in which executor it's supposed to be run. Given that 
there are two different executors, plus "external", client code execution 
context (which we kind of shouldn't know where runs)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to