egor-ryashin commented on a change in pull request #7088: Improve parallelism
of zookeeper based segment change processing
URL: https://github.com/apache/incubator-druid/pull/7088#discussion_r264848831
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
##########
@@ -290,128 +307,119 @@ private void processSegmentChangeRequest()
// We do not create the existence watcher first, because then it will
fire when we create the
// node and we'll have the same race when trying to refresh that
watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path,
noopPayload);
-
- entryRemoved(path);
+ entryRemoved(segmentHolder, path);
}
}
+ catch (KeeperException.NodeExistsException ne) {
+ // This is expected when historicals haven't yet picked up processing
this segment and coordinator
+ // tries reassigning it to the same node.
+ log.warn(ne, "ZK node already exists because segment change request
hasn't yet been processed");
+ failAssign(segmentHolder);
+ }
catch (Exception e) {
- failAssign(e);
+ failAssign(segmentHolder, e);
}
}
- private void actionCompleted()
+ private void actionCompleted(SegmentHolder segmentHolder)
{
- if (currentlyProcessing != null) {
- switch (currentlyProcessing.getType()) {
- case LOAD:
- segmentsToLoad.remove(currentlyProcessing.getSegment());
- queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
- break;
- case DROP:
- segmentsToDrop.remove(currentlyProcessing.getSegment());
- break;
- default:
- throw new UnsupportedOperationException();
- }
-
- final List<LoadPeonCallback> callbacks =
currentlyProcessing.getCallbacks();
- currentlyProcessing = null;
- callBackExecutor.execute(
- () -> executeCallbacks(callbacks)
- );
+ switch (segmentHolder.getType()) {
+ case LOAD:
+ segmentsToLoad.remove(segmentHolder.getSegment());
+ queuedSize.addAndGet(-segmentHolder.getSegmentSize());
+ break;
+ case DROP:
+ segmentsToDrop.remove(segmentHolder.getSegment());
+ break;
+ default:
+ throw new UnsupportedOperationException();
}
+
+ callBackExecutor.execute(
+ () -> executeCallbacks(segmentHolder)
+ );
}
+
@Override
public void start()
{
- ScheduledExecutors.scheduleAtFixedRate(
- processingExecutor,
- config.getLoadQueuePeonRepeatDelay(),
- config.getLoadQueuePeonRepeatDelay(),
- () -> {
- processSegmentChangeRequest();
-
- if (stopped) {
- return ScheduledExecutors.Signal.STOP;
- } else {
- return ScheduledExecutors.Signal.REPEAT;
- }
- }
- );
+ for (int i = 0; i < numProcessingQueues; i++) {
+ processingExecutor.scheduleAtFixedRate(
+ new SegmentChangeProcessor(segmentProcessingQueues[i]),
+ 0,
+ config.getCuratorCreateZkNodesRepeatDelay().getMillis(),
Review comment:
Coordinator can read 1mln segments, but it probably does it within minutes
(not seconds), moreover for each segment Coordinator should determine the
location (which is the most time-consuming part), then Coordinator spreads
those segments across a large pool of peons, overall definitely not 1mln/s. I
don't like to complicate peon logic with those queues considering it doesn't
give a noticeable performance boost.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]