This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f9f734cde5 Display the output column name in InvalidNullByteException
(#14780)
f9f734cde5 is described below
commit f9f734cde5b6dd6e540b189645de0eaf6f18a091
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Aug 24 04:24:41 2023 +0000
Display the output column name in InvalidNullByteException (#14780)
This PR maps the query column to the output column name while surfacing the
fault since that is readily visible to the user while executing the query.
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 50 ++++++++++++++++++++--
.../msq/indexing/error/InvalidNullByteFault.java | 3 +-
.../druid/msq/indexing/error/MSQErrorReport.java | 35 +++++++++++++--
.../druid/msq/input/ParseExceptionUtils.java | 5 ++-
.../druid/msq/exec/MSQParseExceptionsTest.java | 38 ++++++++++++++++
5 files changed, 123 insertions(+), 8 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 19733dea0a..d883a587e9 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -55,6 +55,7 @@ import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.util.DurableStorageUtils;
+import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
@@ -104,6 +105,7 @@ import
org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
+import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault;
@@ -412,9 +414,15 @@ public class ControllerImpl implements Controller
final String selfHost = MSQTasks.getHostFromSelfNode(selfDruidNode);
final MSQErrorReport controllerError =
exceptionEncountered != null
- ? MSQErrorReport.fromException(id(), selfHost, null,
exceptionEncountered)
+ ? MSQErrorReport.fromException(
+ id(),
+ selfHost,
+ null,
+ exceptionEncountered,
+ task.getQuerySpec().getColumnMappings()
+ )
: null;
- final MSQErrorReport workerError = workerErrorRef.get();
+ MSQErrorReport workerError = workerErrorRef.get();
taskStateForReport = TaskState.FAILED;
errorForReport = MSQTasks.makeErrorReport(id(), selfHost,
controllerError, workerError);
@@ -748,7 +756,10 @@ public class ControllerImpl implements Controller
!workerTaskLauncher.isTaskLatest(errorReport.getTaskId())) {
log.info("Ignoring task %s", errorReport.getTaskId());
} else {
- workerErrorRef.compareAndSet(null, errorReport);
+ workerErrorRef.compareAndSet(
+ null,
+ mapQueryColumnNameToOutputColumnName(errorReport)
+ );
}
}
@@ -2651,6 +2662,39 @@ public class ControllerImpl implements Controller
return mergeMode;
}
+ /**
+ * Maps the query column names (used internally while generating the query
plan) to output column names (the one used
+ * by the user in the SQL query) for certain errors reported by workers
(where they have limited knowledge of the
+ * ColumnMappings). For remaining errors not relying on the query column
names, it returns it as is.
+ */
+ @Nullable
+ private MSQErrorReport mapQueryColumnNameToOutputColumnName(
+ @Nullable final MSQErrorReport workerErrorReport
+ )
+ {
+
+ if (workerErrorReport == null) {
+ return null;
+ } else if (workerErrorReport.getFault() instanceof InvalidNullByteFault) {
+ InvalidNullByteFault inbf = (InvalidNullByteFault)
workerErrorReport.getFault();
+ return MSQErrorReport.fromException(
+ workerErrorReport.getTaskId(),
+ workerErrorReport.getHost(),
+ workerErrorReport.getStageNumber(),
+ InvalidNullByteException.builder()
+ .source(inbf.getSource())
+ .rowNumber(inbf.getRowNumber())
+ .column(inbf.getColumn())
+ .value(inbf.getValue())
+ .position(inbf.getPosition())
+ .build(),
+ task.getQuerySpec().getColumnMappings()
+ );
+ } else {
+ return workerErrorReport;
+ }
+ }
+
/**
* Interface used by {@link #contactWorkersForStage}.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
index 3b2a1881fa..0e85e6817d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java
@@ -63,7 +63,8 @@ public class InvalidNullByteFault extends BaseMSQFault
{
super(
CODE,
- "Invalid null byte at source [%s], rowNumber [%d], column[%s],
value[%s], position[%d]. Consider sanitizing the string using REPLACE(\"%s\",
U&'\\0000', '') AS %s",
+ "Invalid null byte at source [%s], rowNumber [%d], column[%s],
value[%s], position[%d]. "
+ + "Consider sanitizing the input string column using REPLACE(\"%s\",
U&'\\0000', '') AS %s",
source,
rowNumber,
column,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
index 09d7c258ec..ffad2c0c80 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
@@ -31,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.statistics.TooManyBucketsException;
import
org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -78,12 +80,23 @@ public class MSQErrorReport
@Nullable final Integer stageNumber,
final Throwable e
)
+ {
+ return fromException(taskId, host, stageNumber, e, null);
+ }
+
+ public static MSQErrorReport fromException(
+ final String taskId,
+ @Nullable final String host,
+ @Nullable final Integer stageNumber,
+ final Throwable e,
+ @Nullable final ColumnMappings columnMappings
+ )
{
return new MSQErrorReport(
taskId,
host,
stageNumber,
- getFaultFromException(e),
+ getFaultFromException(e, columnMappings),
Throwables.getStackTraceAsString(e)
);
}
@@ -159,12 +172,17 @@ public class MSQErrorReport
'}';
}
+ public static MSQFault getFaultFromException(@Nullable final Throwable e)
+ {
+ return getFaultFromException(e, null);
+ }
+
/**
* Magical code that extracts a useful fault from an exception, even if that
exception is not necessarily a
* {@link MSQException}. This method walks through the causal chain, and
also "knows" about various exception
* types thrown by other Druid code.
*/
- public static MSQFault getFaultFromException(@Nullable final Throwable e)
+ public static MSQFault getFaultFromException(@Nullable final Throwable e,
@Nullable final ColumnMappings columnMappings)
{
// Unwrap exception wrappers to find an underlying fault. The assumption
here is that the topmost recognizable
// exception should be used to generate the fault code for the entire
report.
@@ -195,10 +213,21 @@ public class MSQErrorReport
return new RowTooLargeFault(((FrameRowTooLargeException)
cause).getMaxFrameSize());
} else if (cause instanceof InvalidNullByteException) {
InvalidNullByteException invalidNullByteException =
(InvalidNullByteException) cause;
+ String columnName = invalidNullByteException.getColumn();
+ if (columnMappings != null) {
+ IntList outputColumnsForQueryColumn =
columnMappings.getOutputColumnsForQueryColumn(columnName);
+
+ // outputColumnsForQueryColumn.size should always be 1 due to
hasUniqueOutputColumnNames check that is done
+ if (outputColumnsForQueryColumn.size() >= 1) {
+ int outputColumn = outputColumnsForQueryColumn.getInt(0);
+ columnName = columnMappings.getOutputColumnName(outputColumn);
+ }
+ }
+
return new InvalidNullByteFault(
invalidNullByteException.getSource(),
invalidNullByteException.getRowNumber(),
- invalidNullByteException.getColumn(),
+ columnName,
invalidNullByteException.getValue(),
invalidNullByteException.getPosition()
);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
index bfc1769868..0911059b36 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java
@@ -43,7 +43,10 @@ public class ParseExceptionUtils
public static String
generateReadableInputSourceNameFromMappedSegment(Segment segment)
{
if (segment instanceof ExternalSegment) {
- return StringUtils.format("external input source: %s",
((ExternalSegment) segment).externalInputSource().toString());
+ return StringUtils.format(
+ "external input source: %s",
+ ((ExternalSegment) segment).externalInputSource().toString()
+ );
} else if (segment instanceof LookupSegment) {
return StringUtils.format("lookup input source: %s",
segment.getId().getDataSource());
} else if (segment instanceof QueryableIndexSegment) {
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
index 3f4c953bcb..61a89f0d6f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
@@ -103,6 +103,44 @@ public class MSQParseExceptionsTest extends MSQTestBase
.verifyResults();
}
+ @Test
+ public void testIngestWithNullByteInSqlExpression()
+ {
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("desc", ColumnType.STRING)
+ .add("text", ColumnType.STRING)
+ .build();
+
+ testIngestQuery()
+ .setSql(""
+ + "WITH \"ext\" AS (SELECT *\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"inline\",\"data\":\"{\\\"desc\\\":\\\"Row
with NULL\\\",\\\"text\\\":\\\"There is a null in\\\\u0000 here
somewhere\\\"}\\n\"}',\n"
+ + " '{\"type\":\"json\"}'\n"
+ + " )\n"
+ + ") EXTEND (\"desc\" VARCHAR, \"text\" VARCHAR))\n"
+ + "SELECT\n"
+ + " \"desc\",\n"
+ + " REPLACE(\"text\", 'a', 'A') AS \"text\"\n"
+ + "FROM \"ext\"\n"
+ + "")
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedDataSource("foo1")
+ .setExpectedMSQFault(
+ new InvalidNullByteFault(
+ "external input source:
InlineInputSource{data='{\"desc\":\"Row with NULL\",\"text\":\"There is a null
in\\u0000 here somewhere\"}\n'}",
+ 1,
+ "text",
+ "There is A null in\u0000 here somewhere",
+ 18
+ )
+ )
+ .setQueryContext(DEFAULT_MSQ_CONTEXT)
+ .verifyResults();
+ }
+
@Test
public void testIngestWithSanitizedNullByte() throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]