capistrant commented on a change in pull request #10676:
URL: https://github.com/apache/druid/pull/10676#discussion_r603513473
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -566,6 +574,65 @@ static Granularity
findGranularityFromSegments(List<DataSegment> segments)
}
}
+ /**
+ * Wait for segments to become available on the cluster. If waitTimeout is
reached, giveup on waiting. This is a
+ * QoS method that can be used to make Batch Ingest tasks wait to finish
until their ingested data is available on
+ * the cluster. Doing so gives an end user assurance that a Successful task
status means their data is available
+ * for querying.
+ *
+ * @param toolbox {@link TaskToolbox} object with for assisting with task
work.
+ * @param segmentsToWaitFor {@link List} of segments to wait for
availability.
+ * @param waitTimeout Millis to wait before giving up
+ * @return True if all segments became available, otherwise False.
+ */
+ protected boolean waitForSegmentAvailability(
+ TaskToolbox toolbox,
+ List<DataSegment> segmentsToWaitFor,
+ long waitTimeout
+ )
+ {
+ if (segmentsToWaitFor.isEmpty()) {
+ log.info("Asked to wait for segments to be available, but I wasn't
provided with any segments!?");
+ return true;
+ } else if (waitTimeout <= 0) {
+ log.warn("Asked to wait for availability for <= 0 seconds?! Requested
waitTimeout: [%s]", waitTimeout);
+ return false;
+ }
+ log.info("Waiting for [%d] segments to be loaded by the cluster...",
segmentsToWaitFor.size());
+
+ SegmentHandoffNotifier notifier =
toolbox.getSegmentHandoffNotifierFactory()
+
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource());
+ ExecutorService exec = Execs.directExecutor();
+ CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
+
+ notifier.start();
+ for (DataSegment s : segmentsToWaitFor) {
+ notifier.registerSegmentHandoffCallback(
+ new SegmentDescriptor(s.getInterval(), s.getVersion(),
s.getShardSpec().getPartitionNum()),
+ exec,
+ () -> {
+ log.debug(
+ "Confirmed availability for [%s]. Removing from list of
segments to wait for",
+ s.getId()
+ );
+ doneSignal.countDown();
+ }
+ );
+ }
+
+ try {
+ return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for segment availablity; Unable to
confirm availability!");
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ finally {
+ notifier.close();
Review comment:
I agree with this thought. Unsure on how exact implementation should
look. Pasting a block below before pushing it:
```
try(
SegmentHandoffNotifier notifier =
toolbox.getSegmentHandoffNotifierFactory()
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource())
) {
ExecutorService exec = Execs.directExecutor();
CountDownLatch doneSignal = new
CountDownLatch(segmentsToWaitFor.size());
notifier.start();
for (DataSegment s : segmentsToWaitFor) {
notifier.registerSegmentHandoffCallback(
new SegmentDescriptor(s.getInterval(), s.getVersion(),
s.getShardSpec().getPartitionNum()),
exec,
() -> {
log.debug(
"Confirmed availability for [%s]. Removing from list of
segments to wait for",
s.getId()
);
doneSignal.countDown();
}
);
}
try {
return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
log.warn("Interrupted while waiting for segment availablity; Unable
to confirm availability!");
Thread.currentThread().interrupt();
return false;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]