This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e616e633a Determine type of `__time` column by RowSignature in case 
of External Datasource (#12770)
2e616e633a is described below

commit 2e616e633a1b656d346f3e1236bd08dd6e625f19
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Jul 26 12:09:40 2022 +0530

    Determine type of `__time` column by RowSignature in case of External 
Datasource (#12770)
    
    Some queries like `REPLACE INTO ... SELECT TIME_PARSE("__time") AS __time 
FROM ...`
    fail at the Calcite layer because any column with name `__time` is 
considered to be of
    type `SqlTypeName.TIMESTAMP`.
    
    Changes:
    - Modify `RowSignatures.toRelDataType()` so that the type of `__time` column
      is determined by the RowSignature's type.
---
 .../sql/calcite/external/ExternalTableMacro.java   | 14 +++++++++++
 .../apache/druid/sql/calcite/table/DruidTable.java |  7 +++++-
 .../druid/sql/calcite/table/RowSignatures.java     | 18 ++++++++++++--
 .../druid/sql/calcite/CalciteInsertDmlTest.java    | 28 ++++++++++++++++++++++
 4 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java
index 5e73a54825..57c84ef73a 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalTableMacro.java
@@ -31,10 +31,14 @@ import org.apache.calcite.schema.TranslatableTable;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.table.DruidTable;
 
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Used by {@link ExternalOperatorConversion} to generate {@link DruidTable} 
that reference {@link ExternalDataSource}.
@@ -59,6 +63,16 @@ public class ExternalTableMacro implements TableMacro
       final InputFormat inputFormat = jsonMapper.readValue((String) 
arguments.get(1), InputFormat.class);
       final RowSignature signature = jsonMapper.readValue((String) 
arguments.get(2), RowSignature.class);
 
+      // Prevent a RowSignature that has a ColumnSignature with name "__time" 
and type that is not LONG because it
+      // will be automatically casted to LONG while processing in 
RowBasedColumnSelectorFactory.
+      // This can cause an issue when the incorrectly typecasted data is 
ingested or processed upon. One such example
+      // of inconsistency is that functions such as TIME_PARSE evaluate 
incorrectly
+      Optional<ColumnType> timestampColumnTypeOptional = 
signature.getColumnType(ColumnHolder.TIME_COLUMN_NAME);
+      if (timestampColumnTypeOptional.isPresent() && 
!timestampColumnTypeOptional.get().equals(ColumnType.LONG)) {
+        throw new ISE("EXTERN function with __time column can be used when 
__time column is of type long. "
+                      + "Please change the column name to something other than 
__time");
+      }
+
       return new DruidTable(
           new ExternalDataSource(inputSource, inputFormat, signature),
           signature,
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
index dfbf9117cb..4a49001244 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java
@@ -106,7 +106,12 @@ public class DruidTable implements TranslatableTable
   @Override
   public RelDataType getRowType(final RelDataTypeFactory typeFactory)
   {
-    return RowSignatures.toRelDataType(rowSignature, typeFactory);
+    // For external datasources, the row type should be determined by whatever 
the row signature has been explicitly
+    // passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to 
inconsistencies with the Calcite functions
+    // For example, TIME_PARSE(__time) where __time is specified to be a 
string field in the external datasource
+    // would lead to an exception because __time would be interpreted as 
timestamp if we typecast it.
+    boolean typecastTimeColumn = !(dataSource instanceof ExternalDataSource);
+    return RowSignatures.toRelDataType(rowSignature, typeFactory, 
typecastTimeColumn);
   }
 
   @Override
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java
index 73c2527d6f..04d49573ce 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java
@@ -87,16 +87,30 @@ public class RowSignatures
   }
 
   /**
-   * Returns a Calcite RelDataType corresponding to a row signature.
+   * Returns a Calcite RelDataType corresponding to a row signature. It will 
typecast __time column to TIMESTAMP
+   * irrespective of the type present in the row signature
    */
   public static RelDataType toRelDataType(final RowSignature rowSignature, 
final RelDataTypeFactory typeFactory)
+  {
+    return toRelDataType(rowSignature, typeFactory, true);
+  }
+
+  /**
+   * Returns a Calcite RelDataType corresponding to a row signature.
+   * For columns that are named "__time", it automatically casts it to 
TIMESTAMP if typecastTimeColumn is set to true
+   */
+  public static RelDataType toRelDataType(
+      final RowSignature rowSignature,
+      final RelDataTypeFactory typeFactory,
+      boolean typecastTimeColumn
+  )
   {
     final RelDataTypeFactory.Builder builder = typeFactory.builder();
     final boolean nullNumeric = !NullHandling.replaceWithDefault();
     for (final String columnName : rowSignature.getColumnNames()) {
       final RelDataType type;
 
-      if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
+      if (typecastTimeColumn && 
ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
         type = Calcites.createSqlType(typeFactory, SqlTypeName.TIMESTAMP);
       } else {
         final ColumnType columnType =
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 67865c076f..4f56eb9fa1 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.sql.calcite;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -35,6 +37,7 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.SqlPlanningException;
+import org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
@@ -758,6 +761,31 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
         .verify();
   }
 
+  @Test
+  public void 
testInsertOnExternalDataSourceWithIncompatibleTimeColumnSignature()
+  {
+    ExternalDataSource restrictedSignature = new ExternalDataSource(
+        new InlineInputSource("100\nc200\n"),
+        new CsvInputFormat(ImmutableList.of("__time"), null, false, false, 0),
+        RowSignature.builder()
+                    .add("__time", ColumnType.STRING)
+                    .build()
+    );
+    testIngestionQuery()
+        .sql(
+            "INSERT INTO dst SELECT __time FROM %s PARTITIONED BY ALL TIME",
+            externSql(restrictedSignature)
+        )
+        .expectValidationError(
+            CoreMatchers.allOf(
+                CoreMatchers.instanceOf(SqlPlanningException.class),
+                ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
+                    "EXTERN function with __time column can be used when 
__time column is of type long"))
+            )
+        )
+        .verify();
+  }
+
   @Test
   public void testInsertWithSqlOuterLimit()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to