This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fc262dfbaf MSQ: Report the warning directly as an error if none of it
is allowed by the user (#13198)
fc262dfbaf is described below
commit fc262dfbaf434614a2ef550775b0c6b736856e28
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Oct 20 13:43:10 2022 +0530
MSQ: Report the warning directly as an error if none of it is allowed by
the user (#13198)
In MSQ, there can be an upper limit to the number of worker warnings. For
example, for parseExceptions encountered while parsing the external data, the
user can specify an upper limit to the number of parse exceptions that can be
allowed before it throws an error of type TooManyWarnings.
This PR makes it so that if the user disallows warnings of a certain type
i.e. the limit is 0 (or is executing in strict mode), instead of throwing an
error of type TooManyWarnings, we can directly surface the warning as the
error, saving the user from the hassle of going throw the warning reports.
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 21 ++++---
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 31 +++++++++-
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 11 ++++
.../error/MSQWarningReportLimiterPublisher.java | 66 ++++++++++++++++------
.../org/apache/druid/msq/exec/MSQTasksTest.java | 1 +
.../druid/msq/indexing/error/MSQWarningsTest.java | 8 +--
.../org/apache/druid/msq/test/MSQTestBase.java | 48 ++++++++++++----
7 files changed, 142 insertions(+), 44 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index cf5a8902a1..68d73b2efd 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -533,15 +533,6 @@ public class ControllerImpl implements Controller
log.debug("Query [%s] durable storage mode is set to %s.",
queryDef.getQueryId(), isDurableStorageEnabled);
- this.workerTaskLauncher = new MSQWorkerTaskLauncher(
- id(),
- task.getDataSource(),
- context,
- isDurableStorageEnabled,
-
- // 10 minutes +- 2 minutes jitter
- TimeUnit.SECONDS.toMillis(600 +
ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
- );
long maxParseExceptions = -1;
@@ -552,6 +543,17 @@ public class ControllerImpl implements Controller
.orElse(MSQWarnings.DEFAULT_MAX_PARSE_EXCEPTIONS_ALLOWED);
}
+
+ this.workerTaskLauncher = new MSQWorkerTaskLauncher(
+ id(),
+ task.getDataSource(),
+ context,
+ isDurableStorageEnabled,
+ maxParseExceptions,
+ // 10 minutes +- 2 minutes jitter
+ TimeUnit.SECONDS.toMillis(600 +
ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
+ );
+
this.faultsExceededChecker = new FaultsExceededChecker(
ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions)
);
@@ -644,6 +646,7 @@ public class ControllerImpl implements Controller
// Present means the warning limit was exceeded, and warnings have
therefore turned into an error.
String errorCode = warningsExceeded.get().lhs;
Long limit = warningsExceeded.get().rhs;
+
workerError(MSQErrorReport.fromFault(
id(),
selfDruidNode.getHost(),
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 37be816749..d930d825f9 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -67,11 +68,13 @@ import org.apache.druid.msq.indexing.InputChannelsImpl;
import org.apache.druid.msq.indexing.KeyStatisticsCollectionProcessor;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher;
import org.apache.druid.msq.indexing.error.MSQWarningReportPublisher;
import org.apache.druid.msq.indexing.error.MSQWarningReportSimplePublisher;
+import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.InputSlices;
@@ -127,6 +130,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -263,13 +267,38 @@ public class WorkerImpl implements Worker
}
});
+ long maxAllowedParseExceptions =
Long.parseLong(task.getContext().getOrDefault(
+ MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
+ Long.MAX_VALUE
+ ).toString());
+
+ long maxVerboseParseExceptions;
+ if (maxAllowedParseExceptions == -1L) {
+ maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS;
+ } else {
+ maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions,
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS);
+ }
+
+ Set<String> criticalWarningCodes;
+ if (maxAllowedParseExceptions == 0) {
+ criticalWarningCodes =
ImmutableSet.of(CannotParseExternalDataFault.CODE);
+ } else {
+ criticalWarningCodes = ImmutableSet.of();
+ }
+
final MSQWarningReportPublisher msqWarningReportPublisher = new
MSQWarningReportLimiterPublisher(
new MSQWarningReportSimplePublisher(
id(),
controllerClient,
id(),
MSQTasks.getHostFromSelfNode(selfDruidNode)
- )
+ ),
+ Limits.MAX_VERBOSE_WARNINGS,
+ ImmutableMap.of(CannotParseExternalDataFault.CODE,
maxVerboseParseExceptions),
+ criticalWarningCodes,
+ controllerClient,
+ id(),
+ MSQTasks.getHostFromSelfNode(selfDruidNode)
);
closer.register(msqWarningReportPublisher);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 15c260bdfb..f84e067931 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -37,11 +37,13 @@ import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerImpl;
import org.apache.druid.msq.exec.WorkerManagerClient;
import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
+import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -83,6 +85,9 @@ public class MSQWorkerTaskLauncher
private final long maxTaskStartDelayMillis;
private final boolean durableStageStorageEnabled;
+ @Nullable
+ private final Long maxParseExceptions;
+
// Mutable state meant to be accessible by threads outside the main loop.
private final SettableFuture<?> stopFuture = SettableFuture.create();
private final AtomicReference<State> state = new
AtomicReference<>(State.NEW);
@@ -111,6 +116,7 @@ public class MSQWorkerTaskLauncher
final String dataSource,
final ControllerContext context,
final boolean durableStageStorageEnabled,
+ @Nullable final Long maxParseExceptions,
final long maxTaskStartDelayMillis
)
{
@@ -121,6 +127,7 @@ public class MSQWorkerTaskLauncher
"multi-stage-query-task-launcher[" +
StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
);
this.durableStageStorageEnabled = durableStageStorageEnabled;
+ this.maxParseExceptions = maxParseExceptions;
this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
}
@@ -308,6 +315,10 @@ public class MSQWorkerTaskLauncher
taskContext.put(MultiStageQueryContext.CTX_ENABLE_DURABLE_SHUFFLE_STORAGE,
true);
}
+ if (maxParseExceptions != null) {
+ taskContext.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
maxParseExceptions);
+ }
+
final int firstTask;
final int taskCount;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
index c72d70206a..9a8b3f79f6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
@@ -19,11 +19,13 @@
package org.apache.druid.msq.indexing.error;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.msq.exec.Limits;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.msq.exec.ControllerClient;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -35,35 +37,53 @@ import java.util.concurrent.ConcurrentHashMap;
public class MSQWarningReportLimiterPublisher implements
MSQWarningReportPublisher
{
- final MSQWarningReportPublisher delegate;
- final long totalLimit;
- final Map<String, Long> errorCodeToLimit;
- final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new
ConcurrentHashMap<>();
+ private final MSQWarningReportPublisher delegate;
+ private final long totalLimit;
+ private final Map<String, Long> errorCodeToLimit;
+ private final Set<String> criticalWarningCodes;
+ private final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new
ConcurrentHashMap<>();
+ private final ControllerClient controllerClient;
+ private final String workerId;
+
+ @Nullable
+ private final String host;
long totalCount = 0L;
final Object lock = new Object();
- public MSQWarningReportLimiterPublisher(MSQWarningReportPublisher delegate)
- {
- this(
- delegate,
- Limits.MAX_VERBOSE_WARNINGS,
- ImmutableMap.of(
- CannotParseExternalDataFault.CODE,
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS
- )
- );
- }
-
+ /**
+ * Creates a publisher which publishes the warnings to the controller if
they have not yet exceeded the allowed limit.
+ * Moreover, if a warning is disallowed, i.e. it's limit is set to 0, then
the publisher directly reports the warning
+ * as an error
+ * {@code errorCodeToLimit} refers to the maximum number of verbose warnings
that should be published. The actual
+ * limit for the warnings before which the controller should fail can be
much higher and hence a separate {@code criticalWarningCodes}
+ *
+ * @param delegate The delegate publisher which publishes the allowed
warnings
+ * @param totalLimit Total limit of warnings that a worker can publish
+ * @param errorCodeToLimit Map of error code to the number of allowed
warnings that the publisher can publish
+ * @param criticalWarningCodes Error codes which if encountered should be
thrown as error
+ * @param controllerClient Controller client (for directly sending the
warning as an error)
+ * @param workerId workerId, used to construct the error report
+ * @param host worker' host, used to construct the error report
+ */
public MSQWarningReportLimiterPublisher(
MSQWarningReportPublisher delegate,
long totalLimit,
- Map<String, Long> errorCodeToLimit
+ Map<String, Long> errorCodeToLimit,
+ Set<String> criticalWarningCodes,
+ ControllerClient controllerClient,
+ String workerId,
+ @Nullable String host
)
{
this.delegate = delegate;
this.errorCodeToLimit = errorCodeToLimit;
+ this.criticalWarningCodes = criticalWarningCodes;
this.totalLimit = totalLimit;
+ this.controllerClient = controllerClient;
+ this.workerId = workerId;
+ this.host = host;
}
@Override
@@ -74,6 +94,16 @@ public class MSQWarningReportLimiterPublisher implements
MSQWarningReportPublish
totalCount = totalCount + 1;
errorCodeToCurrentCount.compute(errorCode, (ignored, count) -> count ==
null ? 1L : count + 1);
+ // Send the warning as an error if it is disallowed altogether
+ if (criticalWarningCodes.contains(errorCode)) {
+ try {
+ controllerClient.postWorkerError(workerId,
MSQErrorReport.fromException(workerId, host, stageNumber, e));
+ }
+ catch (IOException postException) {
+ throw new RE(postException, "Failed to post the worker error [%s] to
the controller", errorCode);
+ }
+ }
+
if (totalLimit != -1 && totalCount > totalLimit) {
return;
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index 1321d1362f..7792712c6a 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -151,6 +151,7 @@ public class MSQTasksTest
"foo",
controllerContext,
false,
+ -1L,
TimeUnit.SECONDS.toMillis(5)
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
index c41786ce3f..1aa689814a 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java
@@ -139,7 +139,7 @@ public class MSQWarningsTest extends MSQTestBase
.columnMappings(defaultColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
- .setExpectedMSQFault(new TooManyWarningsFault(0,
CannotParseExternalDataFault.CODE))
+
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
}
@@ -318,7 +318,7 @@ public class MSQWarningsTest extends MSQTestBase
.columnMappings(defaultColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
- .setExpectedMSQFault(new TooManyWarningsFault(0,
CannotParseExternalDataFault.CODE))
+
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
}
@@ -340,7 +340,7 @@ public class MSQWarningsTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.addExpectedAggregatorFactory(new
LongSumAggregatorFactory("cnt", "cnt"))
- .setExpectedMSQFault(new TooManyWarningsFault(0,
CannotParseExternalDataFault.CODE))
+
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
}
@@ -359,7 +359,7 @@ public class MSQWarningsTest extends MSQTestBase
+ ") group by 1 PARTITIONED by day ")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
- .setExpectedMSQFault(new TooManyWarningsFault(0,
CannotParseExternalDataFault.CODE))
+
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
// Temporary directory should not contain any controller-related folders
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 45968dbd47..a1ac6b9dea 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -701,6 +701,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected Matcher<Throwable> expectedValidationErrorMatcher = null;
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
protected MSQFault expectedMSQFault = null;
+ protected Class<? extends MSQFault> expectedMSQFaultClass = null;
private boolean hasRun = false;
@@ -763,6 +764,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return (Builder) this;
}
+ public Builder setExpectedMSQFaultClass(Class<? extends MSQFault>
expectedMSQFaultClass)
+ {
+ this.expectedMSQFaultClass = expectedMSQFaultClass;
+ return (Builder) this;
+ }
+
public void verifyPlanningErrors()
{
Preconditions.checkArgument(expectedValidationErrorMatcher != null,
"Validation error matcher cannot be null");
@@ -850,19 +857,28 @@ public class MSQTestBase extends BaseCalciteQueryTest
Preconditions.checkArgument(expectedDataSource != null, "dataSource
cannot be null");
Preconditions.checkArgument(expectedRowSignature != null,
"expectedRowSignature cannot be null");
Preconditions.checkArgument(
- expectedResultRows != null || expectedMSQFault != null,
- "atleast one of expectedResultRows or expectedMSQFault should be set
to non null"
+ expectedResultRows != null || expectedMSQFault != null ||
expectedMSQFaultClass != null,
+ "atleast one of expectedResultRows, expectedMSQFault or
expectedMSQFaultClass should be set to non null"
);
Preconditions.checkArgument(expectedShardSpec != null, "shardSpecClass
cannot be null");
readyToRun();
try {
String controllerId = runMultiStageQuery(sql, queryContext);
- if (expectedMSQFault != null) {
+ if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
- Assert.assertEquals(
- expectedMSQFault.getCodeWithMessage(),
- msqErrorReport.getFault().getCodeWithMessage()
- );
+ if (expectedMSQFault != null) {
+ Assert.assertEquals(
+ expectedMSQFault.getCodeWithMessage(),
+ msqErrorReport.getFault().getCodeWithMessage()
+ );
+ }
+ if (expectedMSQFaultClass != null) {
+ Assert.assertEquals(
+ expectedMSQFaultClass,
+ msqErrorReport.getFault().getClass()
+ );
+ }
+
return;
}
getPayloadOrThrow(controllerId);
@@ -1016,12 +1032,20 @@ public class MSQTestBase extends BaseCalciteQueryTest
try {
String controllerId = runMultiStageQuery(sql, queryContext);
- if (expectedMSQFault != null) {
+ if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
- Assert.assertEquals(
- expectedMSQFault.getCodeWithMessage(),
- msqErrorReport.getFault().getCodeWithMessage()
- );
+ if (expectedMSQFault != null) {
+ Assert.assertEquals(
+ expectedMSQFault.getCodeWithMessage(),
+ msqErrorReport.getFault().getCodeWithMessage()
+ );
+ }
+ if (expectedMSQFaultClass != null) {
+ Assert.assertEquals(
+ expectedMSQFaultClass,
+ msqErrorReport.getFault().getClass()
+ );
+ }
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]