This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fc262dfbaf MSQ: Report the warning directly as an error if none of it 
is allowed by the user (#13198)
fc262dfbaf is described below

commit fc262dfbaf434614a2ef550775b0c6b736856e28
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Oct 20 13:43:10 2022 +0530

    MSQ: Report the warning directly as an error if none of it is allowed by 
the user (#13198)
    
    In MSQ, there can be an upper limit to the number of worker warnings. For 
example, for parseExceptions encountered while parsing the external data, the 
user can specify an upper limit to the number of parse exceptions that can be 
allowed before it throws an error of type TooManyWarnings.
    
    This PR makes it so that if the user disallows warnings of a certain type 
i.e. the limit is 0 (or is executing in strict mode), instead of throwing an 
error of type TooManyWarnings, we can directly surface the warning as the 
error, saving the user from the hassle of going throw the warning reports.
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 21 ++++---
 .../java/org/apache/druid/msq/exec/WorkerImpl.java | 31 +++++++++-
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 11 ++++
 .../error/MSQWarningReportLimiterPublisher.java    | 66 ++++++++++++++++------
 .../org/apache/druid/msq/exec/MSQTasksTest.java    |  1 +
 .../druid/msq/indexing/error/MSQWarningsTest.java  |  8 +--
 .../org/apache/druid/msq/test/MSQTestBase.java     | 48 ++++++++++++----
 7 files changed, 142 insertions(+), 44 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index cf5a8902a1..68d73b2efd 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -533,15 +533,6 @@ public class ControllerImpl implements Controller
 
     log.debug("Query [%s] durable storage mode is set to %s.", 
queryDef.getQueryId(), isDurableStorageEnabled);
 
-    this.workerTaskLauncher = new MSQWorkerTaskLauncher(
-        id(),
-        task.getDataSource(),
-        context,
-        isDurableStorageEnabled,
-
-        // 10 minutes +- 2 minutes jitter
-        TimeUnit.SECONDS.toMillis(600 + 
ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
-    );
 
     long maxParseExceptions = -1;
 
@@ -552,6 +543,17 @@ public class ControllerImpl implements Controller
                                    
.orElse(MSQWarnings.DEFAULT_MAX_PARSE_EXCEPTIONS_ALLOWED);
     }
 
+
+    this.workerTaskLauncher = new MSQWorkerTaskLauncher(
+        id(),
+        task.getDataSource(),
+        context,
+        isDurableStorageEnabled,
+        maxParseExceptions,
+        // 10 minutes +- 2 minutes jitter
+        TimeUnit.SECONDS.toMillis(600 + 
ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
+    );
+
     this.faultsExceededChecker = new FaultsExceededChecker(
         ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions)
     );
@@ -644,6 +646,7 @@ public class ControllerImpl implements Controller
       // Present means the warning limit was exceeded, and warnings have 
therefore turned into an error.
       String errorCode = warningsExceeded.get().lhs;
       Long limit = warningsExceeded.get().rhs;
+
       workerError(MSQErrorReport.fromFault(
           id(),
           selfDruidNode.getHost(),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 37be816749..d930d825f9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -23,6 +23,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -67,11 +68,13 @@ import org.apache.druid.msq.indexing.InputChannelsImpl;
 import org.apache.druid.msq.indexing.KeyStatisticsCollectionProcessor;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
 import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher;
 import org.apache.druid.msq.indexing.error.MSQWarningReportPublisher;
 import org.apache.druid.msq.indexing.error.MSQWarningReportSimplePublisher;
+import org.apache.druid.msq.indexing.error.MSQWarnings;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSliceReader;
 import org.apache.druid.msq.input.InputSlices;
@@ -127,6 +130,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -263,13 +267,38 @@ public class WorkerImpl implements Worker
       }
     });
 
+    long maxAllowedParseExceptions = 
Long.parseLong(task.getContext().getOrDefault(
+        MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
+        Long.MAX_VALUE
+    ).toString());
+
+    long maxVerboseParseExceptions;
+    if (maxAllowedParseExceptions == -1L) {
+      maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS;
+    } else {
+      maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions, 
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS);
+    }
+
+    Set<String> criticalWarningCodes;
+    if (maxAllowedParseExceptions == 0) {
+      criticalWarningCodes = 
ImmutableSet.of(CannotParseExternalDataFault.CODE);
+    } else {
+      criticalWarningCodes = ImmutableSet.of();
+    }
+
     final MSQWarningReportPublisher msqWarningReportPublisher = new 
MSQWarningReportLimiterPublisher(
         new MSQWarningReportSimplePublisher(
             id(),
             controllerClient,
             id(),
             MSQTasks.getHostFromSelfNode(selfDruidNode)
-        )
+        ),
+        Limits.MAX_VERBOSE_WARNINGS,
+        ImmutableMap.of(CannotParseExternalDataFault.CODE, 
maxVerboseParseExceptions),
+        criticalWarningCodes,
+        controllerClient,
+        id(),
+        MSQTasks.getHostFromSelfNode(selfDruidNode)
     );
 
     closer.register(msqWarningReportPublisher);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 15c260bdfb..f84e067931 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -37,11 +37,13 @@ import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.ControllerImpl;
 import org.apache.druid.msq.exec.WorkerManagerClient;
 import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQWarnings;
 import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
 import org.apache.druid.msq.indexing.error.UnknownFault;
 import org.apache.druid.msq.indexing.error.WorkerFailedFault;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 
+import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -83,6 +85,9 @@ public class MSQWorkerTaskLauncher
   private final long maxTaskStartDelayMillis;
   private final boolean durableStageStorageEnabled;
 
+  @Nullable
+  private final Long maxParseExceptions;
+
   // Mutable state meant to be accessible by threads outside the main loop.
   private final SettableFuture<?> stopFuture = SettableFuture.create();
   private final AtomicReference<State> state = new 
AtomicReference<>(State.NEW);
@@ -111,6 +116,7 @@ public class MSQWorkerTaskLauncher
       final String dataSource,
       final ControllerContext context,
       final boolean durableStageStorageEnabled,
+      @Nullable final Long maxParseExceptions,
       final long maxTaskStartDelayMillis
   )
   {
@@ -121,6 +127,7 @@ public class MSQWorkerTaskLauncher
         "multi-stage-query-task-launcher[" + 
StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
     );
     this.durableStageStorageEnabled = durableStageStorageEnabled;
+    this.maxParseExceptions = maxParseExceptions;
     this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
   }
 
@@ -308,6 +315,10 @@ public class MSQWorkerTaskLauncher
       
taskContext.put(MultiStageQueryContext.CTX_ENABLE_DURABLE_SHUFFLE_STORAGE, 
true);
     }
 
+    if (maxParseExceptions != null) {
+      taskContext.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 
maxParseExceptions);
+    }
+
     final int firstTask;
     final int taskCount;
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
index c72d70206a..9a8b3f79f6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
@@ -19,11 +19,13 @@
 
 package org.apache.druid.msq.indexing.error;
 
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.msq.exec.Limits;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.msq.exec.ControllerClient;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -35,35 +37,53 @@ import java.util.concurrent.ConcurrentHashMap;
 public class MSQWarningReportLimiterPublisher implements 
MSQWarningReportPublisher
 {
 
-  final MSQWarningReportPublisher delegate;
-  final long totalLimit;
-  final Map<String, Long> errorCodeToLimit;
-  final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new 
ConcurrentHashMap<>();
+  private final MSQWarningReportPublisher delegate;
+  private final long totalLimit;
+  private final Map<String, Long> errorCodeToLimit;
+  private final Set<String> criticalWarningCodes;
+  private final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new 
ConcurrentHashMap<>();
+  private final ControllerClient controllerClient;
+  private final String workerId;
+
+  @Nullable
+  private final String host;
 
   long totalCount = 0L;
 
   final Object lock = new Object();
 
-  public MSQWarningReportLimiterPublisher(MSQWarningReportPublisher delegate)
-  {
-    this(
-        delegate,
-        Limits.MAX_VERBOSE_WARNINGS,
-        ImmutableMap.of(
-            CannotParseExternalDataFault.CODE, 
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS
-        )
-    );
-  }
-
+  /**
+   * Creates a publisher which publishes the warnings to the controller if 
they have not yet exceeded the allowed limit.
+   * Moreover, if a warning is disallowed, i.e. it's limit is set to 0, then 
the publisher directly reports the warning
+   * as an error
+   * {@code errorCodeToLimit} refers to the maximum number of verbose warnings 
that should be published. The actual
+   * limit for the warnings before which the controller should fail can be 
much higher and hence a separate {@code criticalWarningCodes}
+   *
+   * @param delegate The delegate publisher which publishes the allowed 
warnings
+   * @param totalLimit Total limit of warnings that a worker can publish
+   * @param errorCodeToLimit Map of error code to the number of allowed 
warnings that the publisher can publish
+   * @param criticalWarningCodes Error codes which if encountered should be 
thrown as error
+   * @param controllerClient Controller client (for directly sending the 
warning as an error)
+   * @param workerId workerId, used to construct the error report
+   * @param host worker' host, used to construct the error report
+   */
   public MSQWarningReportLimiterPublisher(
       MSQWarningReportPublisher delegate,
       long totalLimit,
-      Map<String, Long> errorCodeToLimit
+      Map<String, Long> errorCodeToLimit,
+      Set<String> criticalWarningCodes,
+      ControllerClient controllerClient,
+      String workerId,
+      @Nullable String host
   )
   {
     this.delegate = delegate;
     this.errorCodeToLimit = errorCodeToLimit;
+    this.criticalWarningCodes = criticalWarningCodes;
     this.totalLimit = totalLimit;
+    this.controllerClient = controllerClient;
+    this.workerId = workerId;
+    this.host = host;
   }
 
   @Override
@@ -74,6 +94,16 @@ public class MSQWarningReportLimiterPublisher implements 
MSQWarningReportPublish
       totalCount = totalCount + 1;
       errorCodeToCurrentCount.compute(errorCode, (ignored, count) -> count == 
null ? 1L : count + 1);
 
+      // Send the warning as an error if it is disallowed altogether
+      if (criticalWarningCodes.contains(errorCode)) {
+        try {
+          controllerClient.postWorkerError(workerId, 
MSQErrorReport.fromException(workerId, host, stageNumber, e));
+        }
+        catch (IOException postException) {
+          throw new RE(postException, "Failed to post the worker error [%s] to 
the controller", errorCode);
+        }
+      }
+
       if (totalLimit != -1 && totalCount > totalLimit) {
         return;
       }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index 1321d1362f..7792712c6a 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -151,6 +151,7 @@ public class MSQTasksTest
         "foo",
         controllerContext,
         false,
+        -1L,
         TimeUnit.SECONDS.toMillis(5)
     );
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
index c41786ce3f..1aa689814a 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
@@ -139,7 +139,7 @@ public class MSQWarningsTest extends MSQTestBase
                                 .columnMappings(defaultColumnMappings)
                                 .tuningConfig(MSQTuningConfig.defaultConfig())
                                 .build())
-                     .setExpectedMSQFault(new TooManyWarningsFault(0, 
CannotParseExternalDataFault.CODE))
+                     
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
                      .verifyResults();
   }
 
@@ -318,7 +318,7 @@ public class MSQWarningsTest extends MSQTestBase
                                 .columnMappings(defaultColumnMappings)
                                 .tuningConfig(MSQTuningConfig.defaultConfig())
                                 .build())
-                     .setExpectedMSQFault(new TooManyWarningsFault(0, 
CannotParseExternalDataFault.CODE))
+                     
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
                      .verifyResults();
   }
 
@@ -340,7 +340,7 @@ public class MSQWarningsTest extends MSQTestBase
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
                      .addExpectedAggregatorFactory(new 
LongSumAggregatorFactory("cnt", "cnt"))
-                     .setExpectedMSQFault(new TooManyWarningsFault(0, 
CannotParseExternalDataFault.CODE))
+                     
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
                      .verifyResults();
   }
 
@@ -359,7 +359,7 @@ public class MSQWarningsTest extends MSQTestBase
                              + ") group by 1  PARTITIONED by day ")
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
-                     .setExpectedMSQFault(new TooManyWarningsFault(0, 
CannotParseExternalDataFault.CODE))
+                     
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
                      .verifyResults();
 
     // Temporary directory should not contain any controller-related folders
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 45968dbd47..a1ac6b9dea 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -701,6 +701,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
     protected Matcher<Throwable> expectedValidationErrorMatcher = null;
     protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
     protected MSQFault expectedMSQFault = null;
+    protected Class<? extends MSQFault> expectedMSQFaultClass = null;
 
     private boolean hasRun = false;
 
@@ -763,6 +764,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
       return (Builder) this;
     }
 
+    public Builder setExpectedMSQFaultClass(Class<? extends MSQFault> 
expectedMSQFaultClass)
+    {
+      this.expectedMSQFaultClass = expectedMSQFaultClass;
+      return (Builder) this;
+    }
+
     public void verifyPlanningErrors()
     {
       Preconditions.checkArgument(expectedValidationErrorMatcher != null, 
"Validation error matcher cannot be null");
@@ -850,19 +857,28 @@ public class MSQTestBase extends BaseCalciteQueryTest
       Preconditions.checkArgument(expectedDataSource != null, "dataSource 
cannot be null");
       Preconditions.checkArgument(expectedRowSignature != null, 
"expectedRowSignature cannot be null");
       Preconditions.checkArgument(
-          expectedResultRows != null || expectedMSQFault != null,
-          "atleast one of expectedResultRows or expectedMSQFault should be set 
to non null"
+          expectedResultRows != null || expectedMSQFault != null || 
expectedMSQFaultClass != null,
+          "atleast one of expectedResultRows, expectedMSQFault or 
expectedMSQFaultClass should be set to non null"
       );
       Preconditions.checkArgument(expectedShardSpec != null, "shardSpecClass 
cannot be null");
       readyToRun();
       try {
         String controllerId = runMultiStageQuery(sql, queryContext);
-        if (expectedMSQFault != null) {
+        if (expectedMSQFault != null || expectedMSQFaultClass != null) {
           MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
-          Assert.assertEquals(
-              expectedMSQFault.getCodeWithMessage(),
-              msqErrorReport.getFault().getCodeWithMessage()
-          );
+          if (expectedMSQFault != null) {
+            Assert.assertEquals(
+                expectedMSQFault.getCodeWithMessage(),
+                msqErrorReport.getFault().getCodeWithMessage()
+            );
+          }
+          if (expectedMSQFaultClass != null) {
+            Assert.assertEquals(
+                expectedMSQFaultClass,
+                msqErrorReport.getFault().getClass()
+            );
+          }
+
           return;
         }
         getPayloadOrThrow(controllerId);
@@ -1016,12 +1032,20 @@ public class MSQTestBase extends BaseCalciteQueryTest
       try {
         String controllerId = runMultiStageQuery(sql, queryContext);
 
-        if (expectedMSQFault != null) {
+        if (expectedMSQFault != null || expectedMSQFaultClass != null) {
           MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
-          Assert.assertEquals(
-              expectedMSQFault.getCodeWithMessage(),
-              msqErrorReport.getFault().getCodeWithMessage()
-          );
+          if (expectedMSQFault != null) {
+            Assert.assertEquals(
+                expectedMSQFault.getCodeWithMessage(),
+                msqErrorReport.getFault().getCodeWithMessage()
+            );
+          }
+          if (expectedMSQFaultClass != null) {
+            Assert.assertEquals(
+                expectedMSQFaultClass,
+                msqErrorReport.getFault().getClass()
+            );
+          }
           return null;
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to