samarthjain commented on a change in pull request #7088: Improve parallelism of
zookeeper based segment change processing
URL: https://github.com/apache/incubator-druid/pull/7088#discussion_r279479722
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
##########
@@ -219,199 +220,182 @@ public void unmarkSegmentToDrop(DataSegment
dataSegment)
segmentsMarkedToDrop.remove(dataSegment);
}
- private void processSegmentChangeRequest()
+ private class SegmentChangeProcessor implements Runnable
{
- if (currentlyProcessing != null) {
- log.debug(
- "Server[%s] skipping processSegmentChangeRequest because something
is currently loading[%s].",
- basePath,
- currentlyProcessing.getSegmentId()
- );
+ private final SegmentHolder segmentHolder;
- return;
- }
-
- if (!segmentsToDrop.isEmpty()) {
- currentlyProcessing = segmentsToDrop.firstEntry().getValue();
- log.debug("Server[%s] dropping [%s]", basePath,
currentlyProcessing.getSegmentId());
- } else if (!segmentsToLoad.isEmpty()) {
- currentlyProcessing = segmentsToLoad.firstEntry().getValue();
- log.debug("Server[%s] loading [%s]", basePath,
currentlyProcessing.getSegmentId());
- } else {
- return;
+ private SegmentChangeProcessor(SegmentHolder segmentHolder)
+ {
+ this.segmentHolder = segmentHolder;
}
- try {
- final String path = ZKPaths.makePath(basePath,
currentlyProcessing.getSegmentId().toString());
- final byte[] payload =
jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
- curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
-
- processingExecutor.schedule(
- () -> {
- try {
- if (curator.checkExists().forPath(path) != null) {
- failAssign(new ISE("%s was never removed! Failing this
operation!", path));
+ @Override
+ public void run()
+ {
+ try {
+ final String path = ZKPaths.makePath(basePath,
segmentHolder.getSegmentIdentifier());
+ final byte[] payload =
jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest());
+ curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
+ log.debug(
+ "ZKNode created for server to [%s] %s [%s]",
+ basePath,
+ segmentHolder.getType() == LOAD ? "load" : "drop",
+ segmentHolder.getSegmentIdentifier()
+ );
+ final ScheduledFuture<?> future = monitorNodeRemovedExecutor.schedule(
+ () -> {
+ try {
+ if (curator.checkExists().forPath(path) != null) {
+ failAssign(segmentHolder, new ISE("%s was never removed!
Failing this operation!", path));
+ } else {
+ log.debug("%s detected to be removed. ", path);
+ }
}
- }
- catch (Exception e) {
- failAssign(e);
- }
- },
- config.getLoadTimeoutDelay().getMillis(),
- TimeUnit.MILLISECONDS
- );
+ catch (Exception e) {
+ failAssign(segmentHolder, e);
+ }
+ },
+ config.getLoadTimeoutDelay().getMillis(),
+ TimeUnit.MILLISECONDS
+ );
- final Stat stat = curator.checkExists().usingWatcher(
- (CuratorWatcher) watchedEvent -> {
- switch (watchedEvent.getType()) {
- case NodeDeleted:
- entryRemoved(watchedEvent.getPath());
- break;
- default:
- // do nothing
+ final Stat stat = curator.checkExists().usingWatcher(
+ (CuratorWatcher) watchedEvent -> {
+ switch (watchedEvent.getType()) {
+ case NodeDeleted:
+ // Cancel the check node deleted task since we have already
+ // been notified by the zk watcher
+ future.cancel(true);
+ entryRemoved(segmentHolder, watchedEvent.getPath());
+ break;
+ default:
+ // do nothing
+ }
}
- }
- ).forPath(path);
-
- if (stat == null) {
- final byte[] noopPayload = jsonMapper.writeValueAsBytes(new
SegmentChangeRequestNoop());
-
- // Create a node and then delete it to remove the registered watcher.
This is a work-around for
- // a zookeeper race condition. Specifically, when you set a watcher,
it fires on the next event
- // that happens for that node. If no events happen, the watcher stays
registered foreverz.
- // Couple that with the fact that you cannot set a watcher when you
create a node, but what we
- // want is to create a node and then watch for it to get deleted. The
solution is that you *can*
- // set a watcher when you check to see if it exists so, we first
create the node and then set a
- // watcher on its existence. However, if already does not exist by
the time the existence check
- // returns, then the watcher that was set will never fire (nobody will
ever create the node
- // again) and thus lead to a slow, but real, memory leak. So, we
create another node to cause
- // that watcher to fire and delete it right away.
- //
- // We do not create the existence watcher first, because then it will
fire when we create the
- // node and we'll have the same race when trying to refresh that
watcher.
- curator.create().withMode(CreateMode.EPHEMERAL).forPath(path,
noopPayload);
-
- entryRemoved(path);
+ ).forPath(path);
+
+ if (stat == null) {
+ final byte[] noopPayload = jsonMapper.writeValueAsBytes(new
SegmentChangeRequestNoop());
+
+ // Create a node and then delete it to remove the registered
watcher. This is a work-around for
+ // a zookeeper race condition. Specifically, when you set a
watcher, it fires on the next event
+ // that happens for that node. If no events happen, the watcher
stays registered foreverz.
+ // Couple that with the fact that you cannot set a watcher when you
create a node, but what we
+ // want is to create a node and then watch for it to get deleted.
The solution is that you *can*
+ // set a watcher when you check to see if it exists so, we first
create the node and then set a
+ // watcher on its existence. However, if already does not exist by
the time the existence check
+ // returns, then the watcher that was set will never fire (nobody
will ever create the node
+ // again) and thus lead to a slow, but real, memory leak. So, we
create another node to cause
+ // that watcher to fire and delete it right away.
+ //
+ // We do not create the existence watcher first, because then it
will fire when we create the
+ // node and we'll have the same race when trying to refresh that
watcher.
+ curator.create().withMode(CreateMode.EPHEMERAL).forPath(path,
noopPayload);
+ entryRemoved(segmentHolder, path);
+ }
+ }
+ catch (KeeperException.NodeExistsException ne) {
+ // This is expected when historicals haven't yet picked up processing
this segment and coordinator
+ // tries reassigning it to the same node.
+ log.warn(ne, "ZK node already exists because segment change request
hasn't yet been processed");
+ failAssign(segmentHolder);
+ }
+ catch (Exception e) {
+ failAssign(segmentHolder, e);
}
- }
- catch (Exception e) {
- failAssign(e);
}
}
- private void actionCompleted()
+ private void actionCompleted(SegmentHolder segmentHolder)
{
- if (currentlyProcessing != null) {
- switch (currentlyProcessing.getType()) {
- case LOAD:
- segmentsToLoad.remove(currentlyProcessing.getSegment());
- queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
- break;
- case DROP:
- segmentsToDrop.remove(currentlyProcessing.getSegment());
- break;
- default:
- throw new UnsupportedOperationException();
- }
-
- final List<LoadPeonCallback> callbacks =
currentlyProcessing.getCallbacks();
- currentlyProcessing = null;
- callBackExecutor.execute(
- () -> executeCallbacks(callbacks)
- );
+ switch (segmentHolder.getType()) {
+ case LOAD:
+ segmentsToLoad.remove(segmentHolder.getSegment());
+ queuedSize.addAndGet(-segmentHolder.getSegmentSize());
+ break;
+ case DROP:
+ segmentsToDrop.remove(segmentHolder.getSegment());
+ break;
+ default:
+ throw new UnsupportedOperationException();
}
+
+ callBackExecutor.execute(
+ () -> executeCallbacks(segmentHolder)
Review comment:
Fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]