This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f0cdd0c3f `TaskStartTimeoutFault` now depends on the last successful 
worker launch time. (#14172)
6f0cdd0c3f is described below

commit 6f0cdd0c3ff3ece039150f49df70dd4a49454495
Author: Karan Kumar <[email protected]>
AuthorDate: Wed May 3 00:05:15 2023 +0530

    `TaskStartTimeoutFault` now depends on the last successful worker launch 
time. (#14172)
    
    * `TaskStartTimeoutFault` now depends on the last successful worker launch 
time.
---
 docs/multi-stage-query/reference.md                |  2 +-
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 20 +++++++++---
 .../msq/indexing/error/TaskStartTimeoutFault.java  | 37 ++++++++++++----------
 .../org/apache/druid/msq/exec/MSQTasksTest.java    |  2 +-
 .../msq/indexing/error/MSQFaultSerdeTest.java      |  2 +-
 5 files changed, 39 insertions(+), 24 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index b65779bbdf..5b8a2b535b 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -427,7 +427,7 @@ The following table describes error codes you may encounter 
in the `multiStageQu
 | <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could 
not translate the provided native query to a multi-stage query.<br /> <br 
/>This can happen if the query uses features that aren't supported, like 
GROUPING SETS. | |
 | <a name="error_QueryRuntimeError">`QueryRuntimeError`</a> | MSQ uses the 
native query engine to run the leaf stages. This error tells MSQ that error is 
in native query runtime.<br /> <br /> Since this is a generic error, the user 
needs to look at logs for the error message and stack trace to figure out the 
next course of action. If the user is stuck, consider raising a `github` issue 
for assistance. |  `baseErrorMessage` error message from the native query 
runtime. |
 | <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a 
row that was too large to write to a single frame. See the [Limits](#limits) 
table for specific limits on frame size. Note that the effective maximum row 
size is smaller than the maximum frame size due to alignment considerations 
during frame writing. | `maxFrameSize`: The limit on the frame size. |
-| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch 
`numTasks` tasks within `timeout` milliseconds.<br /><br />There may be 
insufficient available slots to start all the worker tasks simultaneously. Try 
splitting up your query into smaller chunks using a smaller value of 
[`maxNumTasks`](#context-parameters). Another option is to increase capacity. | 
`numTasks`: The number of tasks attempted to launch.<br /><br />`timeout`: 
Timeout, in milliseconds, that was exceeded. |
+| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch 
`numTasksNotStarted` worker out of total `totalTasks` workers tasks within 
`timeout` seconds of the last successful worker launch.<br /><br />There may be 
insufficient available slots to start all the worker tasks simultaneously. Try 
splitting up your query into smaller chunks using a smaller value of 
[`maxNumTasks`](#context-parameters). Another option is to increase capacity. | 
`numTasksNotStarted`: Number of  [...]
 | <a name="error_TooManyAttemptsForJob">`TooManyAttemptsForJob`</a> | Total 
relaunch attempt count across all workers exceeded max relaunch attempt limit. 
See the [Limits](#limits) table for the specific limit. | `maxRelaunchCount`: 
Max number of relaunches across all the workers defined in the 
[Limits](#limits) section. <br /><br /> `currentRelaunchCount`: current 
relaunch counter for the job across all workers. <br /><br /> `taskId`: Latest 
task id which failed <br /> <br /> `rootError [...]
 | <a name="error_TooManyAttemptsForWorker">`TooManyAttemptsForWorker`</a> | 
Worker exceeded maximum relaunch attempt count as defined in the 
[Limits](#limits) section. |`maxPerWorkerRelaunchCount`: Max number of 
relaunches allowed per worker as defined in the [Limits](#limits) section. <br 
/><br /> `workerNumber`: the worker number for which the task failed <br /><br 
/> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: 
Error message of the latest failed task.|
 | <a name="error_TooManyBuckets">`TooManyBuckets`</a> | Exceeded the maximum 
number of partition buckets for a stage (5,000 partition buckets).<br />< br 
/>Partition buckets are created for each [`PARTITIONED BY`](#partitioned-by) 
time chunk for INSERT and REPLACE queries. The most common reason for this 
error is that your `PARTITIONED BY` is too narrow relative to your data. | 
`maxBuckets`: The limit on partition buckets. |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 0890c14a84..7295e62e91 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -58,6 +58,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.IntStream;
 
@@ -132,6 +133,8 @@ public class MSQWorkerTaskLauncher
   private final ConcurrentHashMap<Integer, List<String>> workerToTaskIds = new 
ConcurrentHashMap<>();
   private final RetryTask retryTask;
 
+  private final AtomicLong recentFullyStartedWorkerTimeInMs = new 
AtomicLong(System.currentTimeMillis());
+
   public MSQWorkerTaskLauncher(
       final String controllerTaskId,
       final String dataSource,
@@ -493,7 +496,9 @@ public class MSQWorkerTaskLauncher
 
         if (tracker.status.getStatusCode() == TaskState.RUNNING && 
!tracker.unknownLocation()) {
           synchronized (taskIds) {
-            fullyStartedTasks.add(tracker.workerNumber);
+            if (fullyStartedTasks.add(tracker.workerNumber)) {
+              recentFullyStartedWorkerTimeInMs.set(System.currentTimeMillis());
+            }
             taskIds.notifyAll();
           }
         }
@@ -533,7 +538,11 @@ public class MSQWorkerTaskLauncher
 
       } else if (tracker.didRunTimeOut(maxTaskStartDelayMillis) && 
!canceledWorkerTasks.contains(taskId)) {
         removeWorkerFromFullyStartedWorkers(tracker);
-        throw new MSQException(new TaskStartTimeoutFault(numTasks + 1, 
maxTaskStartDelayMillis));
+        throw new MSQException(new TaskStartTimeoutFault(
+            this.getWorkerTaskCount().getPendingWorkerCount(),
+            numTasks + 1,
+            maxTaskStartDelayMillis
+        ));
       } else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
         removeWorkerFromFullyStartedWorkers(tracker);
         log.info("Task[%s] failed because %s. Trying to relaunch the worker", 
taskId, tracker.status.getErrorMsg());
@@ -713,7 +722,7 @@ public class MSQWorkerTaskLauncher
   /**
    * Tracker for information about a worker. Mutable.
    */
-  private static class TaskTracker
+  private class TaskTracker
   {
     private final int workerNumber;
     private final long startTimeMs = System.currentTimeMillis();
@@ -744,11 +753,14 @@ public class MSQWorkerTaskLauncher
       return status != null && status.getStatusCode().isFailure();
     }
 
+    /**
+     * The timeout is checked from the recentFullyStartedWorkerTimeInMs. If 
it's more than maxTaskStartDelayMillis return true.
+     */
     public boolean didRunTimeOut(final long maxTaskStartDelayMillis)
     {
       return (status == null || status.getStatusCode() == TaskState.RUNNING)
              && unknownLocation()
-             && System.currentTimeMillis() - startTimeMs > 
maxTaskStartDelayMillis;
+             && System.currentTimeMillis() - 
recentFullyStartedWorkerTimeInMs.get() > maxTaskStartDelayMillis;
     }
 
     /**
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java
index 43c5a802e5..29b8913498 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java
@@ -32,33 +32,43 @@ public class TaskStartTimeoutFault extends BaseMSQFault
 {
   static final String CODE = "TaskStartTimeout";
 
-  private final int numTasks;
+  private final int numTasksNotStarted;
+  private final int totalTasks;
   private final long timeout;
 
   @JsonCreator
   public TaskStartTimeoutFault(
-      @JsonProperty("numTasks") int numTasks,
+      @JsonProperty("numTasksNotStarted") int numTasksNotStarted,
+      @JsonProperty("totalTasks") int totalTasks,
       @JsonProperty("timeout") long timeout
   )
   {
     super(
         CODE,
-        "Unable to launch [%d] worker tasks within [%,d] seconds. "
+        "Unable to launch [%d] workers out of the total [%d] worker tasks 
within [%,d] seconds of the last successful worker launch."
         + "There might be insufficient available slots to start all worker 
tasks simultaneously. "
         + "Try lowering '%s' in your query context to a number that fits 
within your available task capacity, "
         + "or try increasing capacity.",
-        numTasks,
+        numTasksNotStarted,
+        totalTasks,
         TimeUnit.MILLISECONDS.toSeconds(timeout),
         MultiStageQueryContext.CTX_MAX_NUM_TASKS
     );
-    this.numTasks = numTasks;
+    this.numTasksNotStarted = numTasksNotStarted;
+    this.totalTasks = totalTasks;
     this.timeout = timeout;
   }
 
   @JsonProperty
-  public int getNumTasks()
+  public int getNumTasksNotStarted()
   {
-    return numTasks;
+    return numTasksNotStarted;
+  }
+
+  @JsonProperty
+  public int getTotalTasks()
+  {
+    return totalTasks;
   }
 
   @JsonProperty
@@ -80,21 +90,14 @@ public class TaskStartTimeoutFault extends BaseMSQFault
       return false;
     }
     TaskStartTimeoutFault that = (TaskStartTimeoutFault) o;
-    return numTasks == that.numTasks && timeout == that.timeout;
+    return numTasksNotStarted == that.numTasksNotStarted && totalTasks == 
that.totalTasks && timeout == that.timeout;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), numTasks, timeout);
+    return Objects.hash(super.hashCode(), numTasksNotStarted, totalTasks, 
timeout);
   }
 
-  @Override
-  public String toString()
-  {
-    return "TaskStartTimeoutFault{" +
-           "numTasks=" + numTasks +
-           ", timeout=" + timeout +
-           '}';
-  }
+
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index bd6911c04d..73a443db8a 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -232,7 +232,7 @@ public class MSQTasksTest
     }
     catch (Exception e) {
       Assert.assertEquals(
-          MSQFaultUtils.generateMessageWithErrorCode(new 
TaskStartTimeoutFault(numTasks + 1, 5000)),
+          MSQFaultUtils.generateMessageWithErrorCode(new 
TaskStartTimeoutFault(5, numTasks + 1, 5000)),
           MSQFaultUtils.generateMessageWithErrorCode(((MSQException) 
e.getCause()).getFault())
       );
     }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index b46224d327..256397e9a2 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -66,7 +66,7 @@ public class MSQFaultSerdeTest
     assertFaultSerde(new QueryRuntimeFault("new error", "base error"));
     assertFaultSerde(new QueryRuntimeFault("new error", null));
     assertFaultSerde(new RowTooLargeFault(1000));
-    assertFaultSerde(new TaskStartTimeoutFault(10, 11));
+    assertFaultSerde(new TaskStartTimeoutFault(1, 10, 11));
     assertFaultSerde(new TooManyBucketsFault(10));
     assertFaultSerde(new TooManyColumnsFault(10, 8));
     assertFaultSerde(new TooManyClusteredByColumnsFault(10, 8, 1));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to