This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a70a3d5d489 Fix cancellation bug in MSQ. (#15368)
a70a3d5d489 is described below

commit a70a3d5d4896b803ef8bad1af77c194a2ab45abe
Author: Karan Kumar <[email protected]>
AuthorDate: Wed Nov 15 18:22:51 2023 +0530

    Fix cancellation bug in MSQ. (#15368)
    
    Saw bug where MSQ controller task would continue to hold the task slot even 
after cancel was issued.
    This was due to a deadlock created on work launch. The main thread was 
waiting for tasks to spawn and the cancel thread was waiting for tasks to 
finish.
    The fix was to instruct the MSQWorkerTaskLauncher thread to stop creating 
new tasks which would enable the main thread to unblock and release the slot.
    
    Also short circuited the taskRetriable condition. Now the check is run in 
the MSQWorkerTaskLauncher thread as opposed to the main event thread loop. This 
will result in faster task failure in case the task is deemed to be non 
retriable.
---
 .../org/apache/druid/msq/exec/ControllerImpl.java   | 21 +++++++++++----------
 .../kernel/controller/ControllerQueryKernel.java    | 18 ++++++++++--------
 2 files changed, 21 insertions(+), 18 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index f9470fef37d..1da68a423fa 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -303,7 +303,7 @@ public class ControllerImpl implements Controller
   private Map<Integer, ClusterStatisticsMergeMode> stageToStatsMergingMode;
   private WorkerMemoryParameters workerMemoryParameters;
   private boolean isDurableStorageEnabled;
-  private boolean isFaultToleranceEnabled;
+  private final boolean isFaultToleranceEnabled;
   private volatile SegmentLoadStatusFetcher segmentLoadWaiter;
 
   public ControllerImpl(
@@ -316,6 +316,9 @@ public class ControllerImpl implements Controller
     this.isDurableStorageEnabled = 
MultiStageQueryContext.isDurableStorageEnabled(
         task.getQuerySpec().getQuery().context()
     );
+    this.isFaultToleranceEnabled = 
MultiStageQueryContext.isFaultToleranceEnabled(task.getQuerySpec()
+                                                                               
       .getQuery()
+                                                                               
       .context());
   }
 
   @Override
@@ -372,7 +375,7 @@ public class ControllerImpl implements Controller
     );
 
     if (workerTaskLauncher != null) {
-      workerTaskLauncher.waitForWorkerShutdown();
+      workerTaskLauncher.stop(true);
     }
   }
 
@@ -605,8 +608,6 @@ public class ControllerImpl implements Controller
   private QueryDefinition initializeQueryDefAndState(final Closer closer)
   {
     final QueryContext queryContext = task.getQuerySpec().getQuery().context();
-    isFaultToleranceEnabled = 
MultiStageQueryContext.isFaultToleranceEnabled(queryContext);
-
     if (isFaultToleranceEnabled) {
       if 
(!queryContext.containsKey(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE)) 
{
         // if context key not set, enable durableStorage automatically.
@@ -686,13 +687,13 @@ public class ControllerImpl implements Controller
         task.getDataSource(),
         context,
         (failedTask, fault) -> {
-          addToKernelManipulationQueue((kernel) -> {
-            if (isFaultToleranceEnabled) {
+          if (isFaultToleranceEnabled && 
ControllerQueryKernel.isRetriableFault(fault)) {
+            addToKernelManipulationQueue((kernel) -> {
               addToRetryQueue(kernel, failedTask.getWorkerNumber(), fault);
-            } else {
-              throw new MSQException(fault);
-            }
-          });
+            });
+          } else {
+            throw new MSQException(fault);
+          }
         },
         taskContextOverridesBuilder.build(),
         // 10 minutes +- 2 minutes jitter
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index 62a45c4eaa8..db47a4971a4 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -664,21 +664,23 @@ public class ControllerQueryKernel
    */
   public List<WorkOrder> getWorkInCaseWorkerEligibleForRetryElseThrow(int 
workerNumber, MSQFault msqFault)
   {
+    if (isRetriableFault(msqFault)) {
+      return getWorkInCaseWorkerEligibleForRetry(workerNumber);
+    } else {
+      throw new MSQException(msqFault);
+    }
+  }
 
+  public static boolean isRetriableFault(MSQFault msqFault)
+  {
     final String errorCode;
     if (msqFault instanceof WorkerFailedFault) {
       errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault) 
msqFault).getErrorMsg()));
     } else {
       errorCode = msqFault.getErrorCode();
     }
-
-    log.info("Parsed out errorCode[%s] to check eligibility for retry", 
errorCode);
-
-    if (RETRIABLE_ERROR_CODES.contains(errorCode)) {
-      return getWorkInCaseWorkerEligibleForRetry(workerNumber);
-    } else {
-      throw new MSQException(msqFault);
-    }
+    log.debug("Parsed out errorCode[%s] to check eligibility for retry", 
errorCode);
+    return RETRIABLE_ERROR_CODES.contains(errorCode);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to