liran-funaro commented on a change in pull request #10376:
URL: https://github.com/apache/druid/pull/10376#discussion_r500166537



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
##########
@@ -288,28 +297,48 @@ private void stopInternal()
   }
 
   @Override
-  public void collectReport(SubTaskReportType report)
+  public void collectLiveReport(RunningSubtaskReport report)
   {
-    // subTasks might send their reports multiple times because of the HTTP 
retry.
+    liveReportsMap.compute(report.getTaskId(), (taskId, prevReportCreatedTime) 
-> {
+      if (prevReportCreatedTime == null || prevReportCreatedTime != 
report.getCreatedTimeNs()) {
+        // TODO: the metrics in the report will be processed here.
+      }
+      taskMonitor.statusReport(report.getTaskId(), report.getState());
+      return report.getCreatedTimeNs();
+    });
+  }
+
+  @Override
+  public void collectReport(FinalReportType report)
+  {
+    // Even though each subtask is supposed to send its final report only 
once, supervisor task might receive
+    // the same report multiple times because of the HTTP retry.
     // Here, we simply make sure the current report is exactly same with the 
previous one.
-    reportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
-      if (prevReport != null) {
+    finalReportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
+      if (prevReport == null) {
+        // TODO: the metrics in the report will be processed here.

Review comment:
       What kind of processing is required here that wasn't needed before?
   Same question for `collectLiveReport()`?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int 
maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", 
estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to 
TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with 
metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be 
missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of 
subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = 
runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = 
indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = 
taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch 
(Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories 
to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the 
spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating 
taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask 
because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), 
liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = 
indexingServiceClient.getTaskStatus(taskId);
+                  final TaskStatusPlus taskStatus = 
taskStatusResponse.getStatus();
+                  if (taskStatus != null) {
+                    switch 
(Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
+                      case SUCCESS:
+                        incrementNumSucceededTasks();
+
+                        // Remove the current entry after updating 
taskHistories to make sure that task history
                         // exists either runningTasks or taskHistories.
                         monitorEntry.setLastStatus(taskStatus);
                         iterator.remove();
-                      }
-                      break;
-                    case RUNNING:
-                      monitorEntry.updateStatus(taskStatus);
-                      break;
-                    default:
-                      throw new ISE("Unknown taskStatus[%s] for task[%s[", 
taskStatus.getStatusCode(), taskId);
+                        break;
+                      case FAILED:
+                        incrementNumFailedTasks();
+
+                        log.warn("task[%s] failed!", taskId);

Review comment:
       `taskId` or `specId`?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int 
maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", 
estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to 
TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with 
metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be 
missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of 
subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = 
runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = 
indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = 
taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch 
(Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories 
to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the 
spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating 
taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask 
because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), 
liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = 
indexingServiceClient.getTaskStatus(taskId);

Review comment:
       This line wasn't modified, but `taskId` is used here albeit is now what 
was `specId` before this PR.
   Is this intentional? Is `specId` equals `taskId`?
   See similar issue below.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
##########
@@ -379,24 +419,21 @@ public Granularity getSegmentGranularity()
       pushedSegments.addAll(pushed.getSegments());
       LOG.info("Pushed [%s] segments", pushed.getSegments().size());
       LOG.infoSegments(pushed.getSegments(), "Pushed segments");
-      appenderator.close();

Review comment:
       Nice catch!

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -144,6 +145,8 @@
 
   private final ConcurrentHashMap<Interval, AtomicInteger> 
partitionNumCountersPerInterval = new ConcurrentHashMap<>();
 
+  private final List<ParallelIndexTaskRunner<?, ?>> phaseRunners = new 
ArrayList<>();

Review comment:
       I don't see anywhere that these `phaseRunners` are being reused. What is 
the justification for keeping them in this array?
   Below, I see that you create them anyway in each method call, but use their 
enumeration to fetch them from the list.
   Do you assume the order of the calls?
   
   If this array was indeed intended for reusability of the runners, I suggest 
replacing it with an enumeration based map (`Map<Enum, 
ParallelIndexTaskRunner>`) and create the runner only when it was not created 
before.
   

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int 
estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int 
maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = 
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", 
estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by 
registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to 
TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with 
metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be 
missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of 
subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = 
runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = 
indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = 
taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch 
(Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories 
to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the 
spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating 
taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask 
because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), 
liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = 
indexingServiceClient.getTaskStatus(taskId);
+                  final TaskStatusPlus taskStatus = 
taskStatusResponse.getStatus();
+                  if (taskStatus != null) {
+                    switch 
(Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
+                      case SUCCESS:
+                        incrementNumSucceededTasks();
+
+                        // Remove the current entry after updating 
taskHistories to make sure that task history
                         // exists either runningTasks or taskHistories.
                         monitorEntry.setLastStatus(taskStatus);
                         iterator.remove();
-                      }
-                      break;
-                    case RUNNING:
-                      monitorEntry.updateStatus(taskStatus);
-                      break;
-                    default:
-                      throw new ISE("Unknown taskStatus[%s] for task[%s[", 
taskStatus.getStatusCode(), taskId);
+                        break;
+                      case FAILED:
+                        incrementNumFailedTasks();
+
+                        log.warn("task[%s] failed!", taskId);
+                        if (monitorEntry.numTries() < maxSubtaskRetries) {
+                          log.info(
+                              "We still have more chances[%d/%d] to process 
the spec[%s].",
+                              monitorEntry.numTries(),
+                              maxSubtaskRetries,
+                              monitorEntry.spec.getId()
+                          );
+                          retry(monitorEntry, taskStatus);
+                        } else {
+                          log.error(
+                              "spec[%s] failed after [%d] tries",
+                              monitorEntry.spec.getId(),
+                              monitorEntry.numTries()
+                          );
+                          // Remove the current entry after updating 
taskHistories to make sure that task history
+                          // exists either runningTasks or taskHistories.
+                          monitorEntry.setLastStatus(taskStatus);
+                          iterator.remove();
+                        }
+                        break;
+                      case RUNNING:
+                        monitorEntry.updateRunningStatus(taskStatus);
+                        break;
+                      default:
+                        throw new ISE("Unknown taskStatus[%s] for task[%s[", 
taskStatus.getStatusCode(), taskId);

Review comment:
       `taskId` or `specId`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to