This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5bb5b41b188 Adding task pending time in MSQ reports (#15966)
5bb5b41b188 is described below
commit 5bb5b41b188b83778ebc459a1ad036e06f7a558a
Author: Karan Kumar <[email protected]>
AuthorDate: Tue Feb 27 14:41:28 2024 +0530
Adding task pending time in MSQ reports (#15966)
Added a new field pendingMs in MSQ task reports. This helps in figuring
out the exact run time of the MSQ worker tasks.
Fixed data races.
---
docs/api-reference/sql-ingestion-api.md | 13 +-
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 172 +++++++++++++--------
2 files changed, 119 insertions(+), 66 deletions(-)
diff --git a/docs/api-reference/sql-ingestion-api.md
b/docs/api-reference/sql-ingestion-api.md
index 5492c3ea46d..988f860a85d 100644
--- a/docs/api-reference/sql-ingestion-api.md
+++ b/docs/api-reference/sql-ingestion-api.md
@@ -287,6 +287,16 @@ The response shows an example report for a query.
"status": "SUCCESS",
"startTime": "2022-09-14T22:12:09.266Z",
"durationMs": 28227,
+ "workers": {
+ "0": [
+ {
+ "workerId":
"query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e-worker0_0",
+ "state": "SUCCESS",
+ "durationMs": 15511,
+ "pendingMs": 137
+ }
+ ]
+ },
"pendingTasks": 0,
"runningTasks": 2,
"segmentLoadStatus": {
@@ -607,7 +617,8 @@ The following table describes the response fields when you
retrieve a report for
| `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker
tasks including retries. |
| `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of
the worker task.| |
| `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING,
SUCCESS, or FAILED.|
-| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` |
Milliseconds elapsed after the worker task started running. It is -1 for worker
tasks with status RUNNING.|
+| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` |
Milliseconds elapsed between when the worker task was first requested and when
it finished. It is -1 for worker tasks with status RUNNING.|
+| `multiStageQuery.payload.status.workers.<workerNumber>[].pendingMs` |
Milliseconds elapsed between when the worker task was first requested and when
it fully started RUNNING. Actual work time can be calculated using
`actualWorkTimeMS = durationMs - pendingMs`.|
| `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not
fully started. -1 denotes that the number is currently unknown. |
| `multiStageQuery.payload.status.runningTasks` | Number of currently running
tasks. Should be at least 1 since the controller is included. |
| `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading
container. Only present after the segments have been published. |
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index c2092e7f24a..a485831532f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -306,7 +306,6 @@ public class MSQWorkerTaskLauncher
* Blocks the call untill the worker tasks are ready to be contacted for
work.
*
* @param workerSet
- *
* @throws InterruptedException
*/
public void waitUntilWorkersReady(Set<Integer> workerSet) throws
InterruptedException
@@ -353,45 +352,6 @@ public class MSQWorkerTaskLauncher
}
}
- public static class WorkerStats
- {
- String workerId;
- TaskState state;
- long duration;
-
- /**
- * For JSON deserialization only
- */
- public WorkerStats()
- {
- }
-
- public WorkerStats(String workerId, TaskState state, long duration)
- {
- this.workerId = workerId;
- this.state = state;
- this.duration = duration;
- }
-
- @JsonProperty
- public String getWorkerId()
- {
- return workerId;
- }
-
- @JsonProperty
- public TaskState getState()
- {
- return state;
- }
-
- @JsonProperty("durationMs")
- public long getDuration()
- {
- return duration;
- }
- }
-
public Map<Integer, List<WorkerStats>> getWorkerStats()
{
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();
@@ -400,14 +360,17 @@ public class MSQWorkerTaskLauncher
TaskTracker taskTracker = taskEntry.getValue();
+ TaskStatus taskStatus = taskTracker.statusRef.get();
workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new
ArrayList<>())
- .add(new WorkerStats(taskEntry.getKey(),
- taskTracker.status.getStatusCode(),
- // getDuration() returns -1 for running
tasks.
- // It's not calculated on-the-fly here
since
- // taskTracker.startTimeMillis marks task
- // submission time rather than the
actual start.
- taskTracker.status.getDuration()
+ .add(new WorkerStats(
+ taskEntry.getKey(),
+ taskStatus.getStatusCode(),
+ // getDuration() returns -1 for running tasks.
+ // It's not calculated on-the-fly here since
+ // taskTracker.startTimeMillis marks task
+ // submission time rather than the actual start.
+ taskStatus.getDuration(),
+ taskTracker.taskPendingTimeInMs()
));
}
@@ -576,18 +539,20 @@ public class MSQWorkerTaskLauncher
for (Map.Entry<String, TaskStatus> statusEntry : statuses.entrySet()) {
final String taskId = statusEntry.getKey();
final TaskTracker tracker = taskTrackers.get(taskId);
- tracker.status = statusEntry.getValue();
+ tracker.updateStatus(statusEntry.getValue());
+ TaskStatus status = tracker.statusRef.get();
- if (!tracker.status.getStatusCode().isComplete() &&
tracker.unknownLocation()) {
+ if (!status.getStatusCode().isComplete() && tracker.unknownLocation())
{
// Look up location if not known. Note: this location is not used to
actually contact the task. For that,
// we have SpecificTaskServiceLocator. This location is only used to
determine if a task has started up.
- tracker.initialLocation = workerManager.location(taskId);
+ tracker.setLocation(workerManager.location(taskId));
}
- if (tracker.status.getStatusCode() == TaskState.RUNNING &&
!tracker.unknownLocation()) {
+ if (status.getStatusCode() == TaskState.RUNNING &&
!tracker.unknownLocation()) {
synchronized (taskIds) {
if (fullyStartedTasks.add(tracker.workerNumber)) {
recentFullyStartedWorkerTimeInMillis.set(System.currentTimeMillis());
+ tracker.setFullyStartedTime(System.currentTimeMillis());
}
taskIds.notifyAll();
}
@@ -616,7 +581,7 @@ public class MSQWorkerTaskLauncher
continue;
}
- if (tracker.status == null) {
+ if (tracker.statusRef.get() == null) {
removeWorkerFromFullyStartedWorkers(tracker);
final String errorMessage = StringUtils.format("Task [%s] status
missing", taskId);
log.info(errorMessage + ". Trying to relaunch the worker");
@@ -635,9 +600,10 @@ public class MSQWorkerTaskLauncher
));
} else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
removeWorkerFromFullyStartedWorkers(tracker);
- log.info("Task[%s] failed because %s. Trying to relaunch the worker",
taskId, tracker.status.getErrorMsg());
+ TaskStatus taskStatus = tracker.statusRef.get();
+ log.info("Task[%s] failed because %s. Trying to relaunch the worker",
taskId, taskStatus.getErrorMsg());
tracker.enableRetrying();
- retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId,
tracker.status.getErrorMsg()));
+ retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId,
taskStatus.getErrorMsg()));
}
}
}
@@ -717,7 +683,7 @@ public class MSQWorkerTaskLauncher
Limits.PER_WORKER_RELAUNCH_LIMIT,
relaunchTask.getId(),
relaunchTask.getWorkerNumber(),
- tracker.status.getErrorMsg()
+ tracker.statusRef.get().getErrorMsg()
));
}
if (currentRelaunchCount > Limits.TOTAL_RELAUNCH_LIMIT) {
@@ -725,7 +691,7 @@ public class MSQWorkerTaskLauncher
Limits.TOTAL_RELAUNCH_LIMIT,
currentRelaunchCount,
relaunchTask.getId(),
- tracker.status.getErrorMsg()
+ tracker.statusRef.get().getErrorMsg()
));
}
}
@@ -737,8 +703,9 @@ public class MSQWorkerTaskLauncher
for (final Map.Entry<String, TaskTracker> taskEntry :
taskTrackers.entrySet()) {
final String taskId = taskEntry.getKey();
final TaskTracker tracker = taskEntry.getValue();
- if (!canceledWorkerTasks.contains(taskId)
- && (tracker.status == null ||
!tracker.status.getStatusCode().isComplete())) {
+ if ((!canceledWorkerTasks.contains(taskId))
+ &&
+ (!tracker.isComplete())) {
canceledWorkerTasks.add(taskId);
context.workerManager().cancel(taskId);
}
@@ -831,11 +798,12 @@ public class MSQWorkerTaskLauncher
{
private final int workerNumber;
private final long startTimeMillis = System.currentTimeMillis();
+ private final AtomicLong taskFullyStartedTimeRef = new AtomicLong();
private final MSQWorkerTask msqWorkerTask;
- private TaskStatus status;
- private TaskLocation initialLocation;
+ private final AtomicReference<TaskStatus> statusRef = new
AtomicReference<>();
+ private final AtomicReference<TaskLocation> initialLocationRef = new
AtomicReference<>();
- private boolean isRetrying = false;
+ private final AtomicBoolean isRetryingRef = new AtomicBoolean(false);
public TaskTracker(int workerNumber, MSQWorkerTask msqWorkerTask)
{
@@ -845,16 +813,19 @@ public class MSQWorkerTaskLauncher
public boolean unknownLocation()
{
+ TaskLocation initialLocation = initialLocationRef.get();
return initialLocation == null ||
TaskLocation.unknown().equals(initialLocation);
}
public boolean isComplete()
{
+ TaskStatus status = statusRef.get();
return status != null && status.getStatusCode().isComplete();
}
public boolean didFail()
{
+ TaskStatus status = statusRef.get();
return status != null && status.getStatusCode().isFailure();
}
@@ -869,6 +840,7 @@ public class MSQWorkerTaskLauncher
public boolean didRunTimeOut(final long maxTaskStartDelayMillis)
{
long currentTimeMillis = System.currentTimeMillis();
+ TaskStatus status = statusRef.get();
return (status == null || status.getStatusCode() == TaskState.RUNNING)
&& unknownLocation()
&& currentTimeMillis - recentFullyStartedWorkerTimeInMillis.get()
> maxTaskStartDelayMillis
@@ -880,17 +852,87 @@ public class MSQWorkerTaskLauncher
*/
public void enableRetrying()
{
- isRetrying = true;
+ isRetryingRef.set(true);
}
/**
* Checks is the task is retrying,
- *
- * @return
*/
public boolean isRetrying()
{
- return isRetrying;
+ return isRetryingRef.get();
+ }
+
+ public void setLocation(TaskLocation taskLocation)
+ {
+ initialLocationRef.set(taskLocation);
+ }
+
+ public void updateStatus(TaskStatus taskStatus)
+ {
+ statusRef.set(taskStatus);
+ }
+
+ public void setFullyStartedTime(long currentTimeMillis)
+ {
+ taskFullyStartedTimeRef.set(currentTimeMillis);
+ }
+
+ public long taskPendingTimeInMs()
+ {
+ long currentFullyStartingTime = taskFullyStartedTimeRef.get();
+ if (currentFullyStartingTime == 0) {
+ return System.currentTimeMillis() - startTimeMillis;
+ } else {
+ return Math.max(0, currentFullyStartingTime - startTimeMillis);
+ }
+ }
+ }
+
+ public static class WorkerStats
+ {
+ String workerId;
+ TaskState state;
+ long duration;
+ long pendingTimeInMs;
+
+ /**
+ * For JSON deserialization only
+ */
+ public WorkerStats()
+ {
+ }
+
+ public WorkerStats(String workerId, TaskState state, long duration, long
pendingTimeInMs)
+ {
+ this.workerId = workerId;
+ this.state = state;
+ this.duration = duration;
+ this.pendingTimeInMs = pendingTimeInMs;
+ }
+
+ @JsonProperty
+ public String getWorkerId()
+ {
+ return workerId;
+ }
+
+ @JsonProperty
+ public TaskState getState()
+ {
+ return state;
+ }
+
+ @JsonProperty("durationMs")
+ public long getDuration()
+ {
+ return duration;
+ }
+
+ @JsonProperty("pendingMs")
+ public long getPendingTimeInMs()
+ {
+ return pendingTimeInMs;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]