jihoonson commented on a change in pull request #10676:
URL: https://github.com/apache/druid/pull/10676#discussion_r566368729
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -580,6 +588,64 @@ static Granularity
findGranularityFromSegments(List<DataSegment> segments)
}
}
+ /**
+ * Wait for segments to become available on the cluster. If waitTimeout is
reached, giveup on waiting. This is a
+ * QoS method that can be used to make Batch Ingest tasks wait to finish
until their ingested data is available on
+ * the cluster. Doing so gives an end user assurance that a Successful task
status means their data is available
+ * for querying.
+ *
+ * @param toolbox {@link TaskToolbox} object with for assisting with task
work.
+ * @param segmentsToWaitFor {@link List} of segments to wait for
availability.
+ * @param waitTimeout Millis to wait before giving up
+ * @return True if all segments became available, otherwise False.
+ */
+ protected boolean waitForSegmentAvailability(
+ TaskToolbox toolbox,
+ List<DataSegment> segmentsToWaitFor,
+ long waitTimeout
+ )
+ {
+ if (segmentsToWaitFor.isEmpty()) {
+ log.info("Asked to wait for segments to be available, but I wasn't
provided with any segments!?");
+ return true;
+ } else if (waitTimeout <= 0) {
+ log.warn("Asked to wait for availability for <= 0 seconds?! Requested
waitTimeout: [%s]", waitTimeout);
+ return false;
+ }
+ log.info("Waiting for [%d] segments to be loaded by the cluster...",
segmentsToWaitFor.size());
+
+ SegmentHandoffNotifier notifier =
toolbox.getSegmentHandoffNotifierFactory()
+
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource());
+ ExecutorService exec = Execs.directExecutor();
+ CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
+
+ notifier.start();
+ for (DataSegment s : segmentsToWaitFor) {
+ notifier.registerSegmentHandoffCallback(
+ new SegmentDescriptor(s.getInterval(), s.getVersion(),
s.getShardSpec().getPartitionNum()),
+ exec,
+ () -> {
+ log.debug(
+ "Confirmed availability for [%s]. Removing from list of
segments to wait for",
+ s.getId()
+ );
+ doneSignal.countDown();
+ }
+ );
+ }
+
+ try {
+ return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
Review comment:
The interrupted state is cleared out after it's checked. We should set
the state of the current thread back. Please add
`Thread.currentThread().interrupt()`.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -576,6 +584,64 @@ static Granularity
findGranularityFromSegments(List<DataSegment> segments)
}
}
+ /**
+ * Wait for segments to become available on the cluster. If waitTimeout is
reached, giveup on waiting. This is a
+ * QoS method that can be used to make Batch Ingest tasks wait to finish
until their ingested data is available on
+ * the cluster. Doing so gives an end user assurance that a Successful task
status means their data is available
+ * for querying.
+ *
+ * @param toolbox {@link TaskToolbox} object with for assisting with task
work.
+ * @param segmentsToWaitFor {@link List} of segments to wait for
availability.
+ * @param waitTimeout Millis to wait before giving up
+ * @return True if all segments became available, otherwise False.
+ */
+ protected boolean waitForSegmentAvailability(
Review comment:
Yeah, I think you are right.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
##########
@@ -41,17 +41,22 @@
@Nullable
private String errorMsg;
+ @JsonProperty
+ private boolean segmentAvailabilityConfirmed;
+
public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
- @JsonProperty("errorMsg") @Nullable String errorMsg
+ @JsonProperty("errorMsg") @Nullable String errorMsg,
+ @JsonProperty("segmentAvailabilityConfirmed") boolean
segmentAvailabilityConfirmed
Review comment:
> For us a simple yes/no will suffice. The cluster operators would have
the goal of having 100% of jobs successfully handoff data before the timeout,
but when that doesn't happen our users simply want to know that they may need
to wait longer. We are simply trying to be transparent and report the point in
time status. The onus of finding out when the data is fully loaded if this
timeout expired before loading, would fall on a different solution (TBD).
Cool, are you working on "the different solution"? That would be interesting
too.
> Why the handoff failed would be something I as an operator am more
interested compared to a user (unless that user is also an operator). I think
that would be very difficult to communicate in these reports since the indexing
task doesn't know much about what the rest of the cluster is doing.
I agree. I think we need more visibility on the coordinator behavior.
> Knowing how long it took before the time out could be found in the spec,
but I guess it could be useful to add that value to the report as well if you
think users would want to have quick reference. I think that rather than having
that static value, it could be cool to have the dynamic time waited for
handoff. Maybe it is the static value because we hit the timeout. but as an
operator I would enjoy seeing how long each successful job waited for handoff.
what do you think about that?
That seems useful to me too :+1:
For the time to fail handoff, due to the above issue of the lack of ability
to know the cause of handoff failures, I guess I was wondering if the report
can be a false alarm. For example, the report can say it failed to confirm the
segments handed off, but maybe the handoff could be even not triggered at all
for some reason. I don't think this can happen for now, but is possible in the
future if someone else modifies this area for some good reason.
`segmentAvailabilityConfirmationCompleted` + time to fail handoff can be an
indicator of such unexpected failures. I would say this is not a blocker for
this PR, but it seems useful to me.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]