samarthjain commented on a change in pull request #7088: Improve parallelism of 
zookeeper based segment change processing
URL: https://github.com/apache/incubator-druid/pull/7088#discussion_r264826467
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
 ##########
 @@ -290,128 +307,119 @@ private void processSegmentChangeRequest()
         // We do not create the existence watcher first, because then it will 
fire when we create the
         // node and we'll have the same race when trying to refresh that 
watcher.
         curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, 
noopPayload);
-
-        entryRemoved(path);
+        entryRemoved(segmentHolder, path);
       }
     }
+    catch (KeeperException.NodeExistsException ne) {
+      // This is expected when historicals haven't yet picked up processing 
this segment and coordinator
+      // tries reassigning it to the same node.
+      log.warn(ne, "ZK node already exists because segment change request 
hasn't yet been processed");
+      failAssign(segmentHolder);
+    }
     catch (Exception e) {
-      failAssign(e);
+      failAssign(segmentHolder, e);
     }
   }
 
-  private void actionCompleted()
+  private void actionCompleted(SegmentHolder segmentHolder)
   {
-    if (currentlyProcessing != null) {
-      switch (currentlyProcessing.getType()) {
-        case LOAD:
-          segmentsToLoad.remove(currentlyProcessing.getSegment());
-          queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
-          break;
-        case DROP:
-          segmentsToDrop.remove(currentlyProcessing.getSegment());
-          break;
-        default:
-          throw new UnsupportedOperationException();
-      }
-
-      final List<LoadPeonCallback> callbacks = 
currentlyProcessing.getCallbacks();
-      currentlyProcessing = null;
-      callBackExecutor.execute(
-          () -> executeCallbacks(callbacks)
-      );
+    switch (segmentHolder.getType()) {
+      case LOAD:
+        segmentsToLoad.remove(segmentHolder.getSegment());
+        queuedSize.addAndGet(-segmentHolder.getSegmentSize());
+        break;
+      case DROP:
+        segmentsToDrop.remove(segmentHolder.getSegment());
+        break;
+      default:
+        throw new UnsupportedOperationException();
     }
+
+    callBackExecutor.execute(
+        () -> executeCallbacks(segmentHolder)
+    );
   }
 
+
   @Override
   public void start()
   {
-    ScheduledExecutors.scheduleAtFixedRate(
-        processingExecutor,
-        config.getLoadQueuePeonRepeatDelay(),
-        config.getLoadQueuePeonRepeatDelay(),
-        () -> {
-          processSegmentChangeRequest();
-
-          if (stopped) {
-            return ScheduledExecutors.Signal.STOP;
-          } else {
-            return ScheduledExecutors.Signal.REPEAT;
-          }
-        }
-    );
+    for (int i = 0; i < numProcessingQueues; i++) {
+      processingExecutor.scheduleAtFixedRate(
+          new SegmentChangeProcessor(segmentProcessingQueues[i]),
+          0,
+          config.getCuratorCreateZkNodesRepeatDelay().getMillis(),
 
 Review comment:
   1mn/second or higher is likely. Coordinator polls for the segment files from 
the RDS and then proceeds to call load on them in a loop. For a largish 
cluster, with lots of data and datasources, it is possible to have more than a 
million segments. 
   
   According to javadocs of BlockingQueue, calling drainTo() is likely more 
efficient than continuously polling the queue which I think would extend to 
continuously calling take(). 
   ```
   /**
        * Removes all available elements from this queue and adds them
        * to the given collection.  This operation may be more
        * efficient than repeatedly polling this queue.  
   ```
   
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#drainTo-java.util.Collection-
   
   Further, it makes the code simpler since one call can go and fetch the batch 
worth of records instead of calling take() in a loop.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to