jihoonson commented on a change in pull request #10676:
URL: https://github.com/apache/druid/pull/10676#discussion_r566368729



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -580,6 +588,64 @@ static Granularity 
findGranularityFromSegments(List<DataSegment> segments)
     }
   }
 
+  /**
+   * Wait for segments to become available on the cluster. If waitTimeout is 
reached, giveup on waiting. This is a
+   * QoS method that can be used to make Batch Ingest tasks wait to finish 
until their ingested data is available on
+   * the cluster. Doing so gives an end user assurance that a Successful task 
status means their data is available
+   * for querying.
+   *
+   * @param toolbox {@link TaskToolbox} object with for assisting with task 
work.
+   * @param segmentsToWaitFor {@link List} of segments to wait for 
availability.
+   * @param waitTimeout Millis to wait before giving up
+   * @return True if all segments became available, otherwise False.
+   */
+  protected boolean waitForSegmentAvailability(
+      TaskToolbox toolbox,
+      List<DataSegment> segmentsToWaitFor,
+      long waitTimeout
+  )
+  {
+    if (segmentsToWaitFor.isEmpty()) {
+      log.info("Asked to wait for segments to be available, but I wasn't 
provided with any segments!?");
+      return true;
+    } else if (waitTimeout <= 0) {
+      log.warn("Asked to wait for availability for <= 0 seconds?! Requested 
waitTimeout: [%s]", waitTimeout);
+      return false;
+    }
+    log.info("Waiting for [%d] segments to be loaded by the cluster...", 
segmentsToWaitFor.size());
+
+    SegmentHandoffNotifier notifier = 
toolbox.getSegmentHandoffNotifierFactory()
+                                             
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource());
+    ExecutorService exec = Execs.directExecutor();
+    CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
+
+    notifier.start();
+    for (DataSegment s : segmentsToWaitFor) {
+      notifier.registerSegmentHandoffCallback(
+          new SegmentDescriptor(s.getInterval(), s.getVersion(), 
s.getShardSpec().getPartitionNum()),
+          exec,
+          () -> {
+            log.debug(
+                "Confirmed availability for [%s]. Removing from list of 
segments to wait for",
+                s.getId()
+            );
+            doneSignal.countDown();
+          }
+      );
+    }
+
+    try {
+      return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
+    }
+    catch (InterruptedException e) {

Review comment:
       The interrupted state is cleared out after it's checked. We should set 
the state of the current thread back. Please add 
`Thread.currentThread().interrupt()`.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -576,6 +584,64 @@ static Granularity 
findGranularityFromSegments(List<DataSegment> segments)
     }
   }
 
+  /**
+   * Wait for segments to become available on the cluster. If waitTimeout is 
reached, giveup on waiting. This is a
+   * QoS method that can be used to make Batch Ingest tasks wait to finish 
until their ingested data is available on
+   * the cluster. Doing so gives an end user assurance that a Successful task 
status means their data is available
+   * for querying.
+   *
+   * @param toolbox {@link TaskToolbox} object with for assisting with task 
work.
+   * @param segmentsToWaitFor {@link List} of segments to wait for 
availability.
+   * @param waitTimeout Millis to wait before giving up
+   * @return True if all segments became available, otherwise False.
+   */
+  protected boolean waitForSegmentAvailability(

Review comment:
       Yeah, I think you are right.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
##########
@@ -41,17 +41,22 @@
   @Nullable
   private String errorMsg;
 
+  @JsonProperty
+  private boolean segmentAvailabilityConfirmed;
+
   public IngestionStatsAndErrorsTaskReportData(
       @JsonProperty("ingestionState") IngestionState ingestionState,
       @JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
       @JsonProperty("rowStats") Map<String, Object> rowStats,
-      @JsonProperty("errorMsg") @Nullable String errorMsg
+      @JsonProperty("errorMsg") @Nullable String errorMsg,
+      @JsonProperty("segmentAvailabilityConfirmed") boolean 
segmentAvailabilityConfirmed

Review comment:
       > For us a simple yes/no will suffice. The cluster operators would have 
the goal of having 100% of jobs successfully handoff data before the timeout, 
but when that doesn't happen our users simply want to know that they may need 
to wait longer. We are simply trying to be transparent and report the point in 
time status. The onus of finding out when the data is fully loaded if this 
timeout expired before loading, would fall on a different solution (TBD).
   
   Cool, are you working on "the different solution"? That would be interesting 
too.
   
   > Why the handoff failed would be something I as an operator am more 
interested compared to a user (unless that user is also an operator). I think 
that would be very difficult to communicate in these reports since the indexing 
task doesn't know much about what the rest of the cluster is doing.
   
   I agree. I think we need more visibility on the coordinator behavior.
   
   > Knowing how long it took before the time out could be found in the spec, 
but I guess it could be useful to add that value to the report as well if you 
think users would want to have quick reference. I think that rather than having 
that static value, it could be cool to have the dynamic time waited for 
handoff. Maybe it is the static value because we hit the timeout. but as an 
operator I would enjoy seeing how long each successful job waited for handoff. 
what do you think about that?
   
   That seems useful to me too :+1: 
   
   For the time to fail handoff, due to the above issue of the lack of ability 
to know the cause of handoff failures, I guess I was wondering if the report 
can be a false alarm. For example, the report can say it failed to confirm the 
segments handed off, but maybe the handoff could be even not triggered at all 
for some reason. I don't think this can happen for now, but is possible in the 
future if someone else modifies this area for some good reason. 
`segmentAvailabilityConfirmationCompleted` + time to fail handoff can be an 
indicator of such unexpected failures. I would say this is not a blocker for 
this PR, but it seems useful to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to