cryptoe commented on code in PR #13198:
URL: https://github.com/apache/druid/pull/13198#discussion_r994241988
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -260,13 +264,36 @@ public Optional<MSQErrorReport> runTask(final Closer
closer) throws Exception
}
});
+ long maxAllowedParseExceptions =
Long.parseLong(task.getContext().getOrDefault(
+ MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
+ Long.MAX_VALUE
+ ).toString());
+
+ long maxVerboseParseExceptions;
+ if (maxAllowedParseExceptions == -1L) {
+ maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS;
+ } else {
+ maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions,
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS);
+ }
+
+ Set<String> disallowedWarningCode = ImmutableSet.of();
+ if (maxAllowedParseExceptions == 0) {
+ disallowedWarningCode =
ImmutableSet.of(CannotParseExternalDataFault.CODE);
+ }
+
final MSQWarningReportPublisher msqWarningReportPublisher = new
MSQWarningReportLimiterPublisher(
new MSQWarningReportSimplePublisher(
id(),
controllerClient,
id(),
MSQTasks.getHostFromSelfNode(selfDruidNode)
- )
+ ),
+ Limits.MAX_VERBOSE_WARNINGS,
+ ImmutableMap.of(CannotParseExternalDataFault.CODE,
maxVerboseParseExceptions),
+ disallowedWarningCode,
+ controllerClient,
+ id(),
Review Comment:
nit: id, host, controllerClient should be the first arguments
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -260,13 +264,36 @@ public Optional<MSQErrorReport> runTask(final Closer
closer) throws Exception
}
});
+ long maxAllowedParseExceptions =
Long.parseLong(task.getContext().getOrDefault(
+ MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
+ Long.MAX_VALUE
+ ).toString());
+
+ long maxVerboseParseExceptions;
+ if (maxAllowedParseExceptions == -1L) {
+ maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS;
+ } else {
+ maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions,
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS);
+ }
+
+ Set<String> disallowedWarningCode = ImmutableSet.of();
Review Comment:
Nit:
you can just set this to null and initialize it in the else part.
Also, we can rename this to critical error codes
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java:
##########
@@ -35,35 +36,38 @@
public class MSQWarningReportLimiterPublisher implements
MSQWarningReportPublisher
{
- final MSQWarningReportPublisher delegate;
- final long totalLimit;
- final Map<String, Long> errorCodeToLimit;
- final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new
ConcurrentHashMap<>();
+ private final MSQWarningReportPublisher delegate;
+ private final long totalLimit;
+ private final Map<String, Long> errorCodeToVerboseCountLimit;
+ private final Set<String> disallowedWarningCode;
+ private final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new
ConcurrentHashMap<>();
+ private final ControllerClient controllerClient;
+ private final String workerId;
+
+ @Nullable
+ private final String host;
long totalCount = 0L;
final Object lock = new Object();
- public MSQWarningReportLimiterPublisher(MSQWarningReportPublisher delegate)
- {
- this(
- delegate,
- Limits.MAX_VERBOSE_WARNINGS,
- ImmutableMap.of(
- CannotParseExternalDataFault.CODE,
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS
- )
- );
- }
-
public MSQWarningReportLimiterPublisher(
Review Comment:
I think its about time to add a java doc here.
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:
##########
@@ -851,19 +858,28 @@ public void verifyResults()
Preconditions.checkArgument(expectedDataSource != null, "dataSource
cannot be null");
Preconditions.checkArgument(expectedRowSignature != null,
"expectedRowSignature cannot be null");
Preconditions.checkArgument(
- expectedResultRows != null || expectedMSQFault != null,
- "atleast one of expectedResultRows or expectedMSQFault should be set
to non null"
+ expectedResultRows != null || expectedMSQFault != null ||
expectedMSQFaultClass != null,
+ "atleast one of expectedResultRows, expectedMSQFault or
expectedMSQFaultClass should be set to non null"
);
Preconditions.checkArgument(expectedShardSpec != null, "shardSpecClass
cannot be null");
readyToRun();
try {
String controllerId = runMultiStageQuery(sql, queryContext);
- if (expectedMSQFault != null) {
+ if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
- Assert.assertEquals(
- expectedMSQFault.getCodeWithMessage(),
- msqErrorReport.getFault().getCodeWithMessage()
- );
+ if (expectedMSQFault != null) {
+ Assert.assertEquals(
+ expectedMSQFault.getCodeWithMessage(),
+ msqErrorReport.getFault().getCodeWithMessage()
+ );
+ }
+ if (expectedMSQFaultClass != null) {
Review Comment:
This got me thinking, if we have a malformed line, we can have a
parseException with x mb's of input.
Multiple such lines would blow up our report.
Should we chomp the input to lets day 300 bytes or something in
ParseException#75
cc @jon-wei
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java:
##########
@@ -74,12 +78,22 @@ public void publishException(int stageNumber, Throwable e)
totalCount = totalCount + 1;
errorCodeToCurrentCount.compute(errorCode, (ignored, count) -> count ==
null ? 1L : count + 1);
+ // Send the warning as an error if it is disallowed altogether
+ if (disallowedWarningCode.contains(errorCode)) {
+ try {
+ controllerClient.postWorkerError(workerId,
MSQErrorReport.fromException(workerId, host, stageNumber, e));
+ }
+ catch (IOException e2) {
Review Comment:
Can we throw a new RE with the message `failed to post the worker error xyz
to the controller`. That way the worker will get terminated with a relevant
error.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java:
##########
@@ -35,35 +36,38 @@
public class MSQWarningReportLimiterPublisher implements
MSQWarningReportPublisher
{
- final MSQWarningReportPublisher delegate;
- final long totalLimit;
- final Map<String, Long> errorCodeToLimit;
- final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new
ConcurrentHashMap<>();
+ private final MSQWarningReportPublisher delegate;
+ private final long totalLimit;
+ private final Map<String, Long> errorCodeToVerboseCountLimit;
Review Comment:
nit: errorCodeToLimit seems like a better variable to me.
we really do not need verboseCountLimt as that's a concept outside this
class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]