cryptoe commented on code in PR #13198:
URL: https://github.com/apache/druid/pull/13198#discussion_r994241988


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -260,13 +264,36 @@ public Optional<MSQErrorReport> runTask(final Closer 
closer) throws Exception
       }
     });
 
+    long maxAllowedParseExceptions = 
Long.parseLong(task.getContext().getOrDefault(
+        MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
+        Long.MAX_VALUE
+    ).toString());
+
+    long maxVerboseParseExceptions;
+    if (maxAllowedParseExceptions == -1L) {
+      maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS;
+    } else {
+      maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions, 
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS);
+    }
+
+    Set<String> disallowedWarningCode = ImmutableSet.of();
+    if (maxAllowedParseExceptions == 0) {
+      disallowedWarningCode = 
ImmutableSet.of(CannotParseExternalDataFault.CODE);
+    }
+
     final MSQWarningReportPublisher msqWarningReportPublisher = new 
MSQWarningReportLimiterPublisher(
         new MSQWarningReportSimplePublisher(
             id(),
             controllerClient,
             id(),
             MSQTasks.getHostFromSelfNode(selfDruidNode)
-        )
+        ),
+        Limits.MAX_VERBOSE_WARNINGS,
+        ImmutableMap.of(CannotParseExternalDataFault.CODE, 
maxVerboseParseExceptions),
+        disallowedWarningCode,
+        controllerClient,
+        id(),

Review Comment:
   nit: id, host,  controllerClient should be the first arguments



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -260,13 +264,36 @@ public Optional<MSQErrorReport> runTask(final Closer 
closer) throws Exception
       }
     });
 
+    long maxAllowedParseExceptions = 
Long.parseLong(task.getContext().getOrDefault(
+        MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
+        Long.MAX_VALUE
+    ).toString());
+
+    long maxVerboseParseExceptions;
+    if (maxAllowedParseExceptions == -1L) {
+      maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS;
+    } else {
+      maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions, 
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS);
+    }
+
+    Set<String> disallowedWarningCode = ImmutableSet.of();

Review Comment:
   Nit: 
   you can just set this to null and initialize it in the else part. 
   Also, we can rename this to critical error codes



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java:
##########
@@ -35,35 +36,38 @@
 public class MSQWarningReportLimiterPublisher implements 
MSQWarningReportPublisher
 {
 
-  final MSQWarningReportPublisher delegate;
-  final long totalLimit;
-  final Map<String, Long> errorCodeToLimit;
-  final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new 
ConcurrentHashMap<>();
+  private final MSQWarningReportPublisher delegate;
+  private final long totalLimit;
+  private final Map<String, Long> errorCodeToVerboseCountLimit;
+  private final Set<String> disallowedWarningCode;
+  private final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new 
ConcurrentHashMap<>();
+  private final ControllerClient controllerClient;
+  private final String workerId;
+
+  @Nullable
+  private final String host;
 
   long totalCount = 0L;
 
   final Object lock = new Object();
 
-  public MSQWarningReportLimiterPublisher(MSQWarningReportPublisher delegate)
-  {
-    this(
-        delegate,
-        Limits.MAX_VERBOSE_WARNINGS,
-        ImmutableMap.of(
-            CannotParseExternalDataFault.CODE, 
Limits.MAX_VERBOSE_PARSE_EXCEPTIONS
-        )
-    );
-  }
-
   public MSQWarningReportLimiterPublisher(

Review Comment:
   I think its about time to add a java doc here.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:
##########
@@ -851,19 +858,28 @@ public void verifyResults()
       Preconditions.checkArgument(expectedDataSource != null, "dataSource 
cannot be null");
       Preconditions.checkArgument(expectedRowSignature != null, 
"expectedRowSignature cannot be null");
       Preconditions.checkArgument(
-          expectedResultRows != null || expectedMSQFault != null,
-          "atleast one of expectedResultRows or expectedMSQFault should be set 
to non null"
+          expectedResultRows != null || expectedMSQFault != null || 
expectedMSQFaultClass != null,
+          "atleast one of expectedResultRows, expectedMSQFault or 
expectedMSQFaultClass should be set to non null"
       );
       Preconditions.checkArgument(expectedShardSpec != null, "shardSpecClass 
cannot be null");
       readyToRun();
       try {
         String controllerId = runMultiStageQuery(sql, queryContext);
-        if (expectedMSQFault != null) {
+        if (expectedMSQFault != null || expectedMSQFaultClass != null) {
           MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
-          Assert.assertEquals(
-              expectedMSQFault.getCodeWithMessage(),
-              msqErrorReport.getFault().getCodeWithMessage()
-          );
+          if (expectedMSQFault != null) {
+            Assert.assertEquals(
+                expectedMSQFault.getCodeWithMessage(),
+                msqErrorReport.getFault().getCodeWithMessage()
+            );
+          }
+          if (expectedMSQFaultClass != null) {

Review Comment:
   This got me thinking, if we have a malformed line, we can have a 
parseException with x mb's  of input. 
   Multiple such lines would blow up our report.
   Should we chomp the input to lets day 300 bytes or something in 
ParseException#75 
   cc @jon-wei 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java:
##########
@@ -74,12 +78,22 @@ public void publishException(int stageNumber, Throwable e)
       totalCount = totalCount + 1;
       errorCodeToCurrentCount.compute(errorCode, (ignored, count) -> count == 
null ? 1L : count + 1);
 
+      // Send the warning as an error if it is disallowed altogether
+      if (disallowedWarningCode.contains(errorCode)) {
+        try {
+          controllerClient.postWorkerError(workerId, 
MSQErrorReport.fromException(workerId, host, stageNumber, e));
+        }
+        catch (IOException e2) {

Review Comment:
   Can we throw a new RE with the message `failed to post the worker error xyz 
to the controller`. That way the worker will get terminated with a relevant 
error. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java:
##########
@@ -35,35 +36,38 @@
 public class MSQWarningReportLimiterPublisher implements 
MSQWarningReportPublisher
 {
 
-  final MSQWarningReportPublisher delegate;
-  final long totalLimit;
-  final Map<String, Long> errorCodeToLimit;
-  final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new 
ConcurrentHashMap<>();
+  private final MSQWarningReportPublisher delegate;
+  private final long totalLimit;
+  private final Map<String, Long> errorCodeToVerboseCountLimit;

Review Comment:
   nit: errorCodeToLimit seems like a better variable to me. 
   we really do not need verboseCountLimt as that's a concept outside this 
class. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to