This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a4fb585436 Record column name for exceptions while writing frames in 
RowBasedFrameWriter (#16130)
9a4fb585436 is described below

commit 9a4fb585436845990563092d1ed124c255dc925a
Author: Vishesh Garg <[email protected]>
AuthorDate: Tue Apr 9 15:39:10 2024 +0530

    Record column name for exceptions while writing frames in 
RowBasedFrameWriter (#16130)
    
    Current Runtime Exceptions generated while writing frames only include the 
exception itself without including the name of the column they were encountered 
in. This patch introduces the further information in the error and makes it 
non-retryable.
---
 docs/multi-stage-query/reference.md                |   1 +
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  15 +++
 .../apache/druid/msq/guice/MSQIndexingModule.java  |   2 +
 .../msq/indexing/error/InvalidFieldFault.java      | 128 ++++++++++++++++++
 .../druid/msq/indexing/error/MSQErrorReport.java   |  28 +++-
 .../msq/querykit/scan/ScanQueryFrameProcessor.java |   8 ++
 .../msq/indexing/error/MSQFaultSerdeTest.java      |   1 +
 .../druid/frame/write/InvalidFieldException.java   | 144 +++++++++++++++++++++
 .../druid/frame/write/RowBasedFrameWriter.java     |   9 +-
 9 files changed, 333 insertions(+), 3 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 19b1740b9d4..2e0e7b5437e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -581,6 +581,7 @@ The following table describes error codes you may encounter 
in the `multiStageQu
 | <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or 
REPLACE query was canceled by a higher-priority ingestion job, such as a 
real-time ingestion task. | |
 | <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE 
query encountered a null timestamp in the `__time` field.<br /><br />This can 
happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a 
timestamp that cannot be parsed. 
([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null 
when it cannot parse a timestamp.) In this case, try parsing your timestamps 
using a different function or pattern. Or, if your timestamps may g [...]
 | <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A 
REPLACE query generated a timestamp outside the bounds of the TIMESTAMP 
parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, 
verify that the you specified is valid. | `interval`: time chunk interval 
corresponding to the out-of-bounds timestamp |
+| <a name="error_InvalidField">`InvalidField`</a> | An error was encountered 
while writing a field. | `error`: Encountered error. <br /><br /> `source`: 
Source for the error. <br /><br /> `rowNumber`: Row number (1-indexed) for the 
error. <br /><br /> `column`: Column for the error. |
 | <a name="error_InvalidNullByte">`InvalidNullByte`</a> | A string column 
included a null byte. Null bytes in strings are not permitted. |`source`: The 
source that included the null byte <br /><br /> `rowNumber`: The row number 
(1-indexed) that included the null byte <br /><br /> `column`: The column that 
included the null byte <br /><br /> `value`: Actual string containing the null 
byte <br /><br /> `position`: Position (1-indexed) of occurrence of null byte|
 | <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could 
not translate the provided native query to a multi-stage query.<br /> <br 
/>This can happen if the query uses features that aren't supported, like 
GROUPING SETS. | |
 | <a name="error_QueryRuntimeError">`QueryRuntimeError`</a> | MSQ uses the 
native query engine to run the leaf stages. This error tells MSQ that error is 
in native query runtime.<br /> <br /> Since this is a generic error, the user 
needs to look at logs for the error message and stack trace to figure out the 
next course of action. If the user is stuck, consider raising a `github` issue 
for assistance. |  `baseErrorMessage` error message from the native query 
runtime. |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c29259e318c..ed4a2de8613 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -59,6 +59,7 @@ import org.apache.druid.frame.key.RowKey;
 import org.apache.druid.frame.key.RowKeyReader;
 import org.apache.druid.frame.processor.FrameProcessorExecutor;
 import org.apache.druid.frame.util.DurableStorageUtils;
+import org.apache.druid.frame.write.InvalidFieldException;
 import org.apache.druid.frame.write.InvalidNullByteException;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
@@ -3007,6 +3008,20 @@ public class ControllerImpl implements Controller
                                   .build(),
           task.getQuerySpec().getColumnMappings()
       );
+    } else if (workerErrorReport.getFault() instanceof InvalidFieldException) {
+      InvalidFieldException ife = (InvalidFieldException) 
workerErrorReport.getFault();
+      return MSQErrorReport.fromException(
+          workerErrorReport.getTaskId(),
+          workerErrorReport.getHost(),
+          workerErrorReport.getStageNumber(),
+          InvalidFieldException.builder()
+                               .source(ife.getSource())
+                               .rowNumber(ife.getRowNumber())
+                               .column(ife.getColumn())
+                               .errorMsg(ife.getErrorMsg())
+                               .build(),
+          task.getQuerySpec().getColumnMappings()
+      );
     } else {
       return workerErrorReport;
     }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index f23f58d9c80..125a66331e6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -47,6 +47,7 @@ import 
org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
 import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
 import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
 import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
+import org.apache.druid.msq.indexing.error.InvalidFieldFault;
 import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
 import org.apache.druid.msq.indexing.error.MSQFault;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
@@ -118,6 +119,7 @@ public class MSQIndexingModule implements DruidModule
       InsertLockPreemptedFault.class,
       InsertTimeNullFault.class,
       InsertTimeOutOfBoundsFault.class,
+      InvalidFieldFault.class,
       InvalidNullByteFault.class,
       NotEnoughTemporaryStorageFault.class,
       NotEnoughMemoryFault.class,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidFieldFault.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidFieldFault.java
new file mode 100644
index 00000000000..09834fee20f
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidFieldFault.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+@JsonTypeName(InvalidFieldFault.CODE)
+public class InvalidFieldFault extends BaseMSQFault
+{
+  static final String CODE = "InvalidField";
+
+  @Nullable
+  private final String source;
+  @Nullable
+  private final String column;
+  @Nullable
+  private final Integer rowNumber;
+  @Nullable
+  private final String errorMsg;
+  @Nullable
+  private final String logMsg;
+
+  public InvalidFieldFault(
+      @Nullable @JsonProperty("source") String source,
+      @Nullable @JsonProperty("column") String column,
+      @Nullable @JsonProperty("rowNumber") Integer rowNumber,
+      @Nullable @JsonProperty("errorMsg") String errorMsg,
+      @Nullable @JsonProperty("logMsg") String logMsg
+  )
+  {
+    super(
+        CODE,
+        logMsg
+    );
+    this.column = column;
+    this.rowNumber = rowNumber;
+    this.source = source;
+    this.errorMsg = errorMsg;
+    this.logMsg = logMsg;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Integer getRowNumber()
+  {
+    return rowNumber;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getSource()
+  {
+    return source;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getErrorMsg()
+  {
+    return errorMsg;
+  }
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getLogMsg()
+  {
+    return logMsg;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    InvalidFieldFault that = (InvalidFieldFault) o;
+    return Objects.equals(source, that.source)
+           && Objects.equals(column, that.column)
+           && Objects.equals(rowNumber, that.rowNumber)
+           && Objects.equals(errorMsg, that.errorMsg)
+           && Objects.equals(logMsg, that.logMsg);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), source, column, rowNumber, errorMsg, 
logMsg);
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
index 2725600c49c..8d90bef32ff 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import it.unimi.dsi.fastutil.ints.IntList;
 import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.write.InvalidFieldException;
 import org.apache.druid.frame.write.InvalidNullByteException;
 import org.apache.druid.frame.write.UnsupportedColumnTypeException;
 import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
@@ -182,7 +183,10 @@ public class MSQErrorReport
    * {@link MSQException}. This method walks through the causal chain, and 
also "knows" about various exception
    * types thrown by other Druid code.
    */
-  public static MSQFault getFaultFromException(@Nullable final Throwable e, 
@Nullable final ColumnMappings columnMappings)
+  public static MSQFault getFaultFromException(
+      @Nullable final Throwable e,
+      @Nullable final ColumnMappings columnMappings
+  )
   {
     // Unwrap exception wrappers to find an underlying fault. The assumption 
here is that the topmost recognizable
     // exception should be used to generate the fault code for the entire 
report.
@@ -231,6 +235,28 @@ public class MSQErrorReport
             invalidNullByteException.getValue(),
             invalidNullByteException.getPosition()
         );
+
+      } else if (cause instanceof InvalidFieldException) {
+        InvalidFieldException invalidFieldException = (InvalidFieldException) 
cause;
+        String columnName = invalidFieldException.getColumn();
+        if (columnMappings != null) {
+          IntList outputColumnsForQueryColumn = 
columnMappings.getOutputColumnsForQueryColumn(columnName);
+
+          // outputColumnsForQueryColumn.size should always be 1 due to 
hasUniqueOutputColumnNames check that is done
+          if (!outputColumnsForQueryColumn.isEmpty()) {
+            int outputColumn = outputColumnsForQueryColumn.getInt(0);
+            columnName = columnMappings.getOutputColumnName(outputColumn);
+          }
+        }
+
+        return new InvalidFieldFault(
+            invalidFieldException.getSource(),
+            columnName,
+            invalidFieldException.getRowNumber(),
+            invalidFieldException.getErrorMsg(),
+            invalidFieldException.getMessage()
+        );
+
       } else if (cause instanceof UnexpectedMultiValueDimensionException) {
         return new QueryRuntimeFault(StringUtils.format(
             "Column [%s] is a multi-value string. Please wrap the column using 
MV_TO_ARRAY() to proceed further.",
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 00fe7f8d8db..c570fdc29b9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -38,6 +38,7 @@ import org.apache.druid.frame.segment.FrameSegment;
 import org.apache.druid.frame.util.SettableLongVirtualColumn;
 import org.apache.druid.frame.write.FrameWriter;
 import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.InvalidFieldException;
 import org.apache.druid.frame.write.InvalidNullByteException;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -345,6 +346,13 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
                  .rowNumber(this.cursorOffset.getOffset() + 1)
                  .build();
     }
+    catch (InvalidFieldException ffwe) {
+      InvalidFieldException.Builder builder = 
InvalidFieldException.builder(ffwe);
+      throw
+          
builder.source(ParseExceptionUtils.generateReadableInputSourceNameFromMappedSegment(this.segment))
 // frame segment
+                 .rowNumber(this.cursorOffset.getOffset() + 1)
+                 .build();
+    }
     catch (Exception e) {
       throw Throwables.propagate(e);
     }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 4967ec6c31f..6ee9a5b5276 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -71,6 +71,7 @@ public class MSQFaultSerdeTest
         Collections.singletonList(Intervals.of("2000/2001"))
     ));
     assertFaultSerde(new InvalidNullByteFault("the source", 1, "the column", 
"the value", 2));
+    assertFaultSerde(new InvalidFieldFault("the source", "the column", 1, "the 
error", "the log msg"));
     assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
     assertFaultSerde(QueryNotSupportedFault.INSTANCE);
     assertFaultSerde(new QueryRuntimeFault("new error", "base error"));
diff --git 
a/processing/src/main/java/org/apache/druid/frame/write/InvalidFieldException.java
 
b/processing/src/main/java/org/apache/druid/frame/write/InvalidFieldException.java
new file mode 100644
index 00000000000..3a8bd133c79
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/frame/write/InvalidFieldException.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.write;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+
+public class InvalidFieldException extends RuntimeException
+{
+  @Nullable
+  private final String source;
+  @Nullable
+  private final String column;
+  @Nullable
+  private final Integer rowNumber;
+  @Nullable
+  private final String errorMsg;
+
+  private InvalidFieldException(
+      @Nullable @JsonProperty("source") String source,
+      @Nullable @JsonProperty("column") String column,
+      @Nullable @JsonProperty("rowNumber") Integer rowNumber,
+      @Nullable @JsonProperty("message") String errorMsg
+  )
+  {
+    super(StringUtils.format(
+        "Error[%s] while writing a field for source[%s], rowNumber[%d], 
column[%s].",
+        errorMsg,
+        source,
+        rowNumber,
+        column
+    ));
+    this.column = column;
+    this.rowNumber = rowNumber;
+    this.source = source;
+    this.errorMsg = errorMsg;
+  }
+
+  @Nullable
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @Nullable
+  public Integer getRowNumber()
+  {
+    return rowNumber;
+  }
+
+  @Nullable
+  public String getSource()
+  {
+    return source;
+  }
+
+  @Nullable
+  public String getErrorMsg()
+  {
+    return errorMsg;
+  }
+
+  public static Builder builder()
+  {
+    return new Builder();
+  }
+
+  public static Builder builder(InvalidFieldException invalidFieldException)
+  {
+    return new Builder(invalidFieldException);
+  }
+
+  public static class Builder
+  {
+    @Nullable
+    private String column;
+    @Nullable
+    private Integer rowNumber;
+    @Nullable
+    private String source;
+    @Nullable
+    private String errorMsg;
+
+    public Builder()
+    {
+    }
+
+    public Builder(InvalidFieldException invalidFieldException)
+    {
+      source = invalidFieldException.source;
+      rowNumber = invalidFieldException.rowNumber;
+      column = invalidFieldException.column;
+      errorMsg = invalidFieldException.errorMsg;
+    }
+
+    public InvalidFieldException build()
+    {
+      return new InvalidFieldException(source, column, rowNumber, errorMsg);
+    }
+
+    public Builder column(String val)
+    {
+      column = val;
+      return this;
+    }
+
+    public Builder rowNumber(Integer val)
+    {
+      rowNumber = val;
+      return this;
+    }
+
+    public Builder source(String val)
+    {
+      source = val;
+      return this;
+    }
+
+    public Builder errorMsg(String val)
+    {
+      errorMsg = val;
+      return this;
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java
 
b/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java
index ba376c53275..beae50dca9d 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java
@@ -33,6 +33,7 @@ import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.utils.CloseableUtils;
 
@@ -299,11 +300,15 @@ public class RowBasedFrameWriter implements FrameWriter
                                       .column(signature.getColumnName(i))
                                       .build();
       }
+      catch (ParseException pe) {
+        throw Throwables.propagate(pe);
+      }
       catch (Exception e) {
-        throw Throwables.propagate(e);
+        throw 
InvalidFieldException.builder().column(signature.getColumnName(i))
+                                   .errorMsg(e.getMessage())
+                                   .build();
       }
 
-
       if (writeResult < 0) {
         // Reset to beginning of loop.
         i = -1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to