This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f9f734cde5 Display the output column name in InvalidNullByteException 
(#14780)
f9f734cde5 is described below

commit f9f734cde5b6dd6e540b189645de0eaf6f18a091
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Aug 24 04:24:41 2023 +0000

    Display the output column name in InvalidNullByteException (#14780)
    
    This PR maps the query column to the output column name while surfacing the 
fault since that is readily visible to the user while executing the query.
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 50 ++++++++++++++++++++--
 .../msq/indexing/error/InvalidNullByteFault.java   |  3 +-
 .../druid/msq/indexing/error/MSQErrorReport.java   | 35 +++++++++++++--
 .../druid/msq/input/ParseExceptionUtils.java       |  5 ++-
 .../druid/msq/exec/MSQParseExceptionsTest.java     | 38 ++++++++++++++++
 5 files changed, 123 insertions(+), 8 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 19733dea0a..d883a587e9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -55,6 +55,7 @@ import org.apache.druid.frame.key.RowKey;
 import org.apache.druid.frame.key.RowKeyReader;
 import org.apache.druid.frame.processor.FrameProcessorExecutor;
 import org.apache.druid.frame.util.DurableStorageUtils;
+import org.apache.druid.frame.write.InvalidNullByteException;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.LockGranularity;
@@ -104,6 +105,7 @@ import 
org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
 import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
 import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
 import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
+import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.MSQFault;
@@ -412,9 +414,15 @@ public class ControllerImpl implements Controller
       final String selfHost = MSQTasks.getHostFromSelfNode(selfDruidNode);
       final MSQErrorReport controllerError =
           exceptionEncountered != null
-          ? MSQErrorReport.fromException(id(), selfHost, null, 
exceptionEncountered)
+          ? MSQErrorReport.fromException(
+              id(),
+              selfHost,
+              null,
+              exceptionEncountered,
+              task.getQuerySpec().getColumnMappings()
+          )
           : null;
-      final MSQErrorReport workerError = workerErrorRef.get();
+      MSQErrorReport workerError = workerErrorRef.get();
 
       taskStateForReport = TaskState.FAILED;
       errorForReport = MSQTasks.makeErrorReport(id(), selfHost, 
controllerError, workerError);
@@ -748,7 +756,10 @@ public class ControllerImpl implements Controller
         !workerTaskLauncher.isTaskLatest(errorReport.getTaskId())) {
       log.info("Ignoring task %s", errorReport.getTaskId());
     } else {
-      workerErrorRef.compareAndSet(null, errorReport);
+      workerErrorRef.compareAndSet(
+          null,
+          mapQueryColumnNameToOutputColumnName(errorReport)
+      );
     }
   }
 
@@ -2651,6 +2662,39 @@ public class ControllerImpl implements Controller
     return mergeMode;
   }
 
+  /**
+   * Maps the query column names (used internally while generating the query 
plan) to output column names (the one used
+   * by the user in the SQL query) for certain errors reported by workers 
(where they have limited knowledge of the
+   * ColumnMappings). For remaining errors not relying on the query column 
names, it returns it as is.
+   */
+  @Nullable
+  private MSQErrorReport mapQueryColumnNameToOutputColumnName(
+      @Nullable final MSQErrorReport workerErrorReport
+  )
+  {
+
+    if (workerErrorReport == null) {
+      return null;
+    } else if (workerErrorReport.getFault() instanceof InvalidNullByteFault) {
+      InvalidNullByteFault inbf = (InvalidNullByteFault) 
workerErrorReport.getFault();
+      return MSQErrorReport.fromException(
+          workerErrorReport.getTaskId(),
+          workerErrorReport.getHost(),
+          workerErrorReport.getStageNumber(),
+          InvalidNullByteException.builder()
+                                  .source(inbf.getSource())
+                                  .rowNumber(inbf.getRowNumber())
+                                  .column(inbf.getColumn())
+                                  .value(inbf.getValue())
+                                  .position(inbf.getPosition())
+                                  .build(),
+          task.getQuerySpec().getColumnMappings()
+      );
+    } else {
+      return workerErrorReport;
+    }
+  }
+
 
   /**
    * Interface used by {@link #contactWorkersForStage}.
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
index 3b2a1881fa..0e85e6817d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
@@ -63,7 +63,8 @@ public class InvalidNullByteFault extends BaseMSQFault
   {
     super(
         CODE,
-        "Invalid null byte at source [%s], rowNumber [%d], column[%s], 
value[%s], position[%d]. Consider sanitizing the string using REPLACE(\"%s\", 
U&'\\0000', '') AS %s",
+        "Invalid null byte at source [%s], rowNumber [%d], column[%s], 
value[%s], position[%d]. "
+        + "Consider sanitizing the input string column using REPLACE(\"%s\", 
U&'\\0000', '') AS %s",
         source,
         rowNumber,
         column,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
index 09d7c258ec..ffad2c0c80 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import it.unimi.dsi.fastutil.ints.IntList;
 import org.apache.druid.frame.processor.FrameRowTooLargeException;
 import org.apache.druid.frame.write.InvalidNullByteException;
 import org.apache.druid.frame.write.UnsupportedColumnTypeException;
@@ -31,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.msq.statistics.TooManyBucketsException;
 import 
org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -78,12 +80,23 @@ public class MSQErrorReport
       @Nullable final Integer stageNumber,
       final Throwable e
   )
+  {
+    return fromException(taskId, host, stageNumber, e, null);
+  }
+
+  public static MSQErrorReport fromException(
+      final String taskId,
+      @Nullable final String host,
+      @Nullable final Integer stageNumber,
+      final Throwable e,
+      @Nullable final ColumnMappings columnMappings
+  )
   {
     return new MSQErrorReport(
         taskId,
         host,
         stageNumber,
-        getFaultFromException(e),
+        getFaultFromException(e, columnMappings),
         Throwables.getStackTraceAsString(e)
     );
   }
@@ -159,12 +172,17 @@ public class MSQErrorReport
            '}';
   }
 
+  public static MSQFault getFaultFromException(@Nullable final Throwable e)
+  {
+    return getFaultFromException(e, null);
+  }
+
   /**
    * Magical code that extracts a useful fault from an exception, even if that 
exception is not necessarily a
    * {@link MSQException}. This method walks through the causal chain, and 
also "knows" about various exception
    * types thrown by other Druid code.
    */
-  public static MSQFault getFaultFromException(@Nullable final Throwable e)
+  public static MSQFault getFaultFromException(@Nullable final Throwable e, 
@Nullable final ColumnMappings columnMappings)
   {
     // Unwrap exception wrappers to find an underlying fault. The assumption 
here is that the topmost recognizable
     // exception should be used to generate the fault code for the entire 
report.
@@ -195,10 +213,21 @@ public class MSQErrorReport
         return new RowTooLargeFault(((FrameRowTooLargeException) 
cause).getMaxFrameSize());
       } else if (cause instanceof InvalidNullByteException) {
         InvalidNullByteException invalidNullByteException = 
(InvalidNullByteException) cause;
+        String columnName = invalidNullByteException.getColumn();
+        if (columnMappings != null) {
+          IntList outputColumnsForQueryColumn = 
columnMappings.getOutputColumnsForQueryColumn(columnName);
+
+          // outputColumnsForQueryColumn.size should always be 1 due to 
hasUniqueOutputColumnNames check that is done
+          if (outputColumnsForQueryColumn.size() >= 1) {
+            int outputColumn = outputColumnsForQueryColumn.getInt(0);
+            columnName = columnMappings.getOutputColumnName(outputColumn);
+          }
+        }
+
         return new InvalidNullByteFault(
             invalidNullByteException.getSource(),
             invalidNullByteException.getRowNumber(),
-            invalidNullByteException.getColumn(),
+            columnName,
             invalidNullByteException.getValue(),
             invalidNullByteException.getPosition()
         );
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
index bfc1769868..0911059b36 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
@@ -43,7 +43,10 @@ public class ParseExceptionUtils
   public static String 
generateReadableInputSourceNameFromMappedSegment(Segment segment)
   {
     if (segment instanceof ExternalSegment) {
-      return StringUtils.format("external input source: %s", 
((ExternalSegment) segment).externalInputSource().toString());
+      return StringUtils.format(
+          "external input source: %s",
+          ((ExternalSegment) segment).externalInputSource().toString()
+      );
     } else if (segment instanceof LookupSegment) {
       return StringUtils.format("lookup input source: %s", 
segment.getId().getDataSource());
     } else if (segment instanceof QueryableIndexSegment) {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
index 3f4c953bcb..61a89f0d6f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
@@ -103,6 +103,44 @@ public class MSQParseExceptionsTest extends MSQTestBase
         .verifyResults();
   }
 
+  @Test
+  public void testIngestWithNullByteInSqlExpression()
+  {
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("desc", ColumnType.STRING)
+                                            .add("text", ColumnType.STRING)
+                                            .build();
+
+    testIngestQuery()
+        .setSql(""
+                + "WITH \"ext\" AS (SELECT *\n"
+                + "FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{\"type\":\"inline\",\"data\":\"{\\\"desc\\\":\\\"Row 
with NULL\\\",\\\"text\\\":\\\"There is a null in\\\\u0000 here 
somewhere\\\"}\\n\"}',\n"
+                + "    '{\"type\":\"json\"}'\n"
+                + "  )\n"
+                + ") EXTEND (\"desc\" VARCHAR, \"text\" VARCHAR))\n"
+                + "SELECT\n"
+                + "  \"desc\",\n"
+                + "  REPLACE(\"text\", 'a', 'A') AS \"text\"\n"
+                + "FROM \"ext\"\n"
+                + "")
+        .setExpectedRowSignature(rowSignature)
+        .setExpectedDataSource("foo1")
+        .setExpectedMSQFault(
+            new InvalidNullByteFault(
+                "external input source: 
InlineInputSource{data='{\"desc\":\"Row with NULL\",\"text\":\"There is a null 
in\\u0000 here somewhere\"}\n'}",
+                1,
+                "text",
+                "There is A null in\u0000 here somewhere",
+                18
+            )
+        )
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
+        .verifyResults();
+  }
+
   @Test
   public void testIngestWithSanitizedNullByte() throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to