This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4775427e2c Add task start status to worker report (#13263)
4775427e2c is described below
commit 4775427e2c77840c19e1d2458b7e0bb0d01e76d8
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri Oct 28 12:00:15 2022 +0530
Add task start status to worker report (#13263)
* Add task start status to worker report
* Address review comments
* Address review comments
* Update documentation
* Update spelling checks
---
docs/multi-stage-query/api.md | 2 ++
.../org/apache/druid/msq/exec/ControllerImpl.java | 27 ++++++++++++++++++----
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 14 +++++++++++
.../druid/msq/indexing/report/MSQStatusReport.java | 21 ++++++++++++++++-
.../msq/indexing/report/MSQTaskReportTest.java | 8 ++++---
website/.spelling | 2 ++
6 files changed, 66 insertions(+), 8 deletions(-)
diff --git a/docs/multi-stage-query/api.md b/docs/multi-stage-query/api.md
index 5de3ed1273..b81370c42a 100644
--- a/docs/multi-stage-query/api.md
+++ b/docs/multi-stage-query/api.md
@@ -553,6 +553,8 @@ The following table describes the response fields when you
retrieve a report for
|multiStageQuery.payload.status.status|RUNNING, SUCCESS, or FAILED.|
|multiStageQuery.payload.status.startTime|Start time of the query in ISO
format. Only present if the query has started running.|
|multiStageQuery.payload.status.durationMs|Milliseconds elapsed after the
query has started running. -1 denotes that the query hasn't started running
yet.|
+|multiStageQuery.payload.status.pendingTasks|Number of tasks that are not
fully started. -1 denotes that the number is currently unknown.|
+|multiStageQuery.payload.status.runningTasks|Number of currently running
tasks. Should be at least 1 since the controller is included.|
|multiStageQuery.payload.status.errorReport|Error object. Only present if
there was an error.|
|multiStageQuery.payload.status.errorReport.taskId|The task that reported the
error, if known. May be a controller task or a worker task.|
|multiStageQuery.payload.status.errorReport.host|The hostname and port of the
task that reported the error, if known.|
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 68d73b2efd..8686abbc2d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -447,7 +447,8 @@ public class ControllerImpl implements Controller
errorForReport,
workerWarnings,
queryStartTime,
- new Interval(queryStartTime,
DateTimes.nowUtc()).toDurationMillis()
+ new Interval(queryStartTime,
DateTimes.nowUtc()).toDurationMillis(),
+ workerTaskLauncher
),
stagesReport,
countersSnapshot,
@@ -716,7 +717,8 @@ public class ControllerImpl implements Controller
null,
workerWarnings,
queryStartTime,
- queryStartTime == null ? -1L : new
Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis()
+ queryStartTime == null ? -1L : new
Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
+ workerTaskLauncher
),
makeStageReport(
queryDef,
@@ -1807,10 +1809,27 @@ public class ControllerImpl implements Controller
@Nullable final MSQErrorReport errorReport,
final Queue<MSQErrorReport> errorReports,
@Nullable final DateTime queryStartTime,
- final long queryDuration
+ final long queryDuration,
+ MSQWorkerTaskLauncher taskLauncher
)
{
- return new MSQStatusReport(taskState, errorReport, errorReports,
queryStartTime, queryDuration);
+ int pendingTasks = -1;
+ int runningTasks = 1;
+
+ if (taskLauncher != null) {
+ Pair<Integer, Integer> workerTaskStatus =
taskLauncher.getWorkerTaskStatus();
+ pendingTasks = workerTaskStatus.lhs;
+ runningTasks = workerTaskStatus.rhs + 1; // To account for controller.
+ }
+ return new MSQStatusReport(
+ taskState,
+ errorReport,
+ errorReports,
+ queryStartTime,
+ queryDuration,
+ pendingTasks,
+ runningTasks
+ );
}
private static InputSpecSlicerFactory makeInputSpecSlicerFactory(final
DataSegmentTimelineView timelineView)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index f84e067931..12c869d23c 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@@ -345,6 +346,19 @@ public class MSQWorkerTaskLauncher
}
}
+ /**
+ * Returns a pair which contains the number of currently running worker
tasks and the number of worker tasks that are
+ * not yet fully started as left and right respectively.
+ */
+ public Pair<Integer, Integer> getWorkerTaskStatus()
+ {
+ synchronized (taskIds) {
+ int runningTasks = fullyStartedTasks.size();
+ int pendingTasks = desiredTaskCount - runningTasks;
+ return Pair.of(runningTasks, pendingTasks);
+ }
+ }
+
/**
* Used by the main loop to update {@link #taskTrackers} and {@link
#fullyStartedTasks}.
*/
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
index d2849d8b2c..3791bc82e1 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java
@@ -46,6 +46,9 @@ public class MSQStatusReport
private final long durationMs;
+ private final int pendingTasks;
+
+ private final int runningTasks;
@JsonCreator
public MSQStatusReport(
@@ -53,7 +56,9 @@ public class MSQStatusReport
@JsonProperty("errorReport") @Nullable MSQErrorReport errorReport,
@JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
@JsonProperty("startTime") @Nullable DateTime startTime,
- @JsonProperty("durationMs") long durationMs
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("pendingTasks") int pendingTasks,
+ @JsonProperty("runningTasks") int runningTasks
)
{
this.status = Preconditions.checkNotNull(status, "status");
@@ -61,6 +66,8 @@ public class MSQStatusReport
this.warningReports = warningReports != null ? warningReports :
Collections.emptyList();
this.startTime = startTime;
this.durationMs = durationMs;
+ this.pendingTasks = pendingTasks;
+ this.runningTasks = runningTasks;
}
@JsonProperty
@@ -92,6 +99,18 @@ public class MSQStatusReport
return startTime;
}
+ @JsonProperty
+ public int getPendingTasks()
+ {
+ return pendingTasks;
+ }
+
+ @JsonProperty
+ public int getRunningTasks()
+ {
+ return runningTasks;
+ }
+
@JsonProperty
public long getDurationMs()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 2e4fc794cf..4902675509 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -91,7 +91,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0),
+ new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, 1, 2),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@@ -119,6 +119,8 @@ public class MSQTaskReportTest
Assert.assertEquals(TASK_ID, report2.getTaskId());
Assert.assertEquals(report.getPayload().getStatus().getStatus(),
report2.getPayload().getStatus().getStatus());
Assert.assertNull(report2.getPayload().getStatus().getErrorReport());
+ Assert.assertEquals(report.getPayload().getStatus().getRunningTasks(),
report2.getPayload().getStatus().getRunningTasks());
+ Assert.assertEquals(report.getPayload().getStatus().getPendingTasks(),
report2.getPayload().getStatus().getPendingTasks());
Assert.assertEquals(report.getPayload().getStages(),
report2.getPayload().getStages());
Yielder<Object[]> yielder =
report2.getPayload().getResults().getResultYielder();
@@ -142,7 +144,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.FAILED, errorReport, new
ArrayDeque<>(), null, 0),
+ new MSQStatusReport(TaskState.FAILED, errorReport, new
ArrayDeque<>(), null, 0, 1, 2),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@@ -179,7 +181,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
- new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0),
+ new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(),
null, 0, 1, 2),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
diff --git a/website/.spelling b/website/.spelling
index bbabc3160e..5f9e60ff58 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -632,6 +632,8 @@ multiStageQuery.payload.status
multiStageQuery.payload.status.status
multiStageQuery.payload.status.startTime
multiStageQuery.payload.status.durationMs
+multiStageQuery.payload.status.pendingTasks
+multiStageQuery.payload.status.runningTasks
multiStageQuery.payload.status.errorReport
multiStageQuery.payload.status.errorReport.taskId
multiStageQuery.payload.status.errorReport.host
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]