This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a2a12d79b rerwrite node so dynamic parameter applies to ingest node 
as well. (#17126)
37a2a12d79b is described below

commit 37a2a12d79bb56b16d45484ac9923eed8da5b934
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Sep 23 12:49:46 2024 -0700

    rerwrite node so dynamic parameter applies to ingest node as well. (#17126)
---
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 67 ++++++++++++++++++++++
 .../org/apache/druid/msq/exec/MSQSelectTest.java   | 49 ++++++++++++++++
 .../org/apache/druid/msq/test/MSQTestBase.java     | 21 ++++---
 .../druid/sql/calcite/planner/DruidPlanner.java    | 24 +++++++-
 .../druid/sql/calcite/planner/IngestHandler.java   |  1 -
 .../druid/sql/calcite/planner/QueryHandler.java    | 20 +------
 6 files changed, 154 insertions(+), 28 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 2ae3eaf0383..8f8a0b569c9 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.msq.exec;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.remote.TypedValue;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -704,6 +706,71 @@ public class MSQReplaceTest extends MSQTestBase
                      .verifyResults();
   }
 
+  @MethodSource("data")
+  @ParameterizedTest(name = "{index}:with context {0}")
+  public void testReplaceWithDynamicParameters(String contextName, Map<String, 
Object> context)
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("m1", ColumnType.FLOAT)
+                                            .build();
+
+    testIngestQuery().setSql(
+                         " REPLACE INTO foo OVERWRITE WHERE __time >= ? AND 
__time < ? "
+                         + "SELECT __time, m1 "
+                         + "FROM foo "
+                         + "WHERE __time >= ? AND __time < ? "
+                         + "PARTITIONED by DAY ")
+                     .setDynamicParameters(ImmutableList.of(
+                         
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, 
DateTimes.of("2000-01-02").getMillis()),
+                         
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, 
DateTimes.of("2000-01-03").getMillis()),
+                         
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, 
DateTimes.of("2000-01-02").getMillis()),
+                         
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, 
DateTimes.of("2000-01-03").getMillis())
+                     ))
+                     .setExpectedDataSource("foo")
+                     
.setExpectedDestinationIntervals(ImmutableList.of(Intervals.of(
+                         "2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")))
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedSegments(ImmutableSet.of(SegmentId.of(
+                         "foo",
+                         Intervals.of("2000-01-02T/P1D"),
+                         "test",
+                         0
+                     )))
+                     .setExpectedResultRows(ImmutableList.of(new 
Object[]{946771200000L, 2.0f}))
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().totalFiles(1),
+                         0, 0, "input0"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(1).frames(1),
+                         0, 0, "shuffle"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(1).frames(1),
+                         1, 0, "input0"
+                     )
+                     
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+                         CounterSnapshotMatcher
+                             .with().segmentRowsProcessed(1),
+                         1, 0
+                     )
+                     .setExpectedLastCompactionState(
+                         expectedCompactionState(
+                             context,
+                             Collections.emptyList(),
+                             Collections.singletonList(new 
FloatDimensionSchema("m1")),
+                             GranularityType.DAY,
+                             Intervals.of("2000-01-02T/P1D")
+                         )
+                     )
+                     .verifyResults();
+  }
+
   @MethodSource("data")
   @ParameterizedTest(name = "{index}:with context {0}")
   public void testReplaceOnFoo1WithAllExtern(String contextName, Map<String, 
Object> context) throws IOException
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index a5822f3a0b7..91a1983bfd6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.msq.exec;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.remote.TypedValue;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.InlineInputSource;
@@ -409,6 +411,53 @@ public class MSQSelectTest extends MSQTestBase
         .verifyResults();
   }
 
+  @MethodSource("data")
+  @ParameterizedTest(name = "{index}:with context {0}")
+  public void testSelectWithDynamicParameters(String contextName, Map<String, 
Object> context)
+  {
+    RowSignature resultSignature = RowSignature.builder()
+                                               .add("cnt", ColumnType.LONG)
+                                               .add("dim1", ColumnType.STRING)
+                                               .build();
+
+    // Filter [__time >= timestamp '3000-01-01 00:00:00'] matches no segments 
at all.
+    testSelectQuery()
+        .setSql("select cnt,dim1 from foo where __time >= ?")
+        .setExpectedMSQSpec(
+            MSQSpec.builder()
+                   .query(
+                       newScanQueryBuilder()
+                           .dataSource(CalciteTests.DATASOURCE1)
+                           .intervals(
+                               querySegmentSpec(
+                                   Intervals.utc(
+                                       DateTimes.of("3000").getMillis(),
+                                       Intervals.ETERNITY.getEndMillis()
+                                   )
+                               )
+                           )
+                           .columns("cnt", "dim1")
+                           .context(defaultScanQueryContext(context, 
resultSignature))
+                           .build()
+                   )
+                   .columnMappings(ColumnMappings.identity(resultSignature))
+                   .tuningConfig(MSQTuningConfig.defaultConfig())
+                   .destination(isDurableStorageDestination(contextName, 
context)
+                                ? DurableStorageMSQDestination.INSTANCE
+                                : TaskReportMSQDestination.INSTANCE)
+                   .build()
+        )
+        .setDynamicParameters(
+            ImmutableList.of(
+                TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, 
DateTimes.of("3000-01-01").getMillis())
+            )
+        )
+        .setQueryContext(context)
+        .setExpectedRowSignature(resultSignature)
+        .setExpectedResultRows(ImmutableList.of())
+        .verifyResults();
+  }
+
   @MethodSource("data")
   @ParameterizedTest(name = "{index}:with context {0}")
   public void testSelectOnFooWhereMatchesNoData(String contextName, 
Map<String, Object> context)
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index e1ce49d8292..761a61337ea 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -36,6 +36,7 @@ import com.google.inject.Module;
 import com.google.inject.TypeLiteral;
 import com.google.inject.util.Modules;
 import com.google.inject.util.Providers;
+import org.apache.calcite.avatica.remote.TypedValue;
 import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.collections.ResourceHolder;
@@ -217,7 +218,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -784,13 +784,13 @@ public class MSQTestBase extends BaseCalciteQueryTest
     );
   }
 
-  private String runMultiStageQuery(String query, Map<String, Object> context)
+  private String runMultiStageQuery(String query, Map<String, Object> context, 
List<TypedValue> parameters)
   {
     final DirectStatement stmt = sqlStatementFactory.directStatement(
         new SqlQueryPlus(
             query,
             context,
-            Collections.emptyList(),
+            parameters,
             CalciteTests.REGULAR_USER_AUTH_RESULT
         )
     );
@@ -886,6 +886,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
     protected String sql = null;
     protected MSQControllerTask taskSpec = null;
     protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
+    protected List<TypedValue> dynamicParameters = new ArrayList<>();
     protected List<MSQResultsReport.ColumnAndType> expectedRowSignature = null;
     protected MSQSpec expectedMSQSpec = null;
     protected MSQTuningConfig expectedTuningConfig = null;
@@ -924,6 +925,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
       return asBuilder();
     }
 
+    public Builder setDynamicParameters(List<TypedValue> dynamicParameters)
+    {
+      this.dynamicParameters = dynamicParameters;
+      return asBuilder();
+    }
+
     public Builder 
setExpectedRowSignature(List<MSQResultsReport.ColumnAndType> 
expectedRowSignature)
     {
       Preconditions.checkArgument(!expectedRowSignature.isEmpty(), "Row 
signature cannot be empty");
@@ -1057,7 +1064,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
 
       final Throwable e = Assert.assertThrows(
           Throwable.class,
-          () -> runMultiStageQuery(sql, queryContext)
+          () -> runMultiStageQuery(sql, queryContext, dynamicParameters)
       );
 
       assertThat(e, expectedValidationErrorMatcher);
@@ -1209,7 +1216,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
         String controllerId;
         if (sql != null) {
           // Run the sql command.
-          controllerId = runMultiStageQuery(sql, queryContext);
+          controllerId = runMultiStageQuery(sql, queryContext, 
dynamicParameters);
         } else {
           // Run the task spec directly instead.
           controllerId = TEST_CONTROLLER_TASK_ID;
@@ -1426,7 +1433,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
       try {
         String controllerId;
         if (sql != null) {
-          controllerId = runMultiStageQuery(sql, queryContext);
+          controllerId = runMultiStageQuery(sql, queryContext, 
dynamicParameters);
         } else {
           // Run the task spec directly instead.
           controllerId = TEST_CONTROLLER_TASK_ID;
@@ -1468,7 +1475,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
       Preconditions.checkArgument(sql == null || queryContext != null, 
"queryContext cannot be null");
 
       try {
-        String controllerId = runMultiStageQuery(sql, queryContext);
+        String controllerId = runMultiStageQuery(sql, queryContext, 
dynamicParameters);
 
         if (expectedMSQFault != null || expectedMSQFaultClass != null) {
           MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index cf1d22eb39b..03ef94656c5 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -64,7 +64,6 @@ import java.util.function.Function;
  */
 public class DruidPlanner implements Closeable
 {
-
   public static final Joiner SPACE_JOINER = Joiner.on(" ");
   public static final Joiner COMMA_JOINER = Joiner.on(", ");
 
@@ -148,6 +147,7 @@ public class DruidPlanner implements Closeable
     catch (SqlParseException e1) {
       throw translateException(e1);
     }
+    root = rewriteParameters(root);
     hook.captureSqlNode(root);
     handler = createHandler(root);
     handler.validate();
@@ -158,6 +158,7 @@ public class DruidPlanner implements Closeable
   private SqlStatementHandler createHandler(final SqlNode node)
   {
     SqlNode query = node;
+
     SqlExplain explain = null;
     if (query.getKind() == SqlKind.EXPLAIN) {
       explain = (SqlExplain) query;
@@ -179,6 +180,27 @@ public class DruidPlanner implements Closeable
     throw InvalidSqlInput.exception("Unsupported SQL statement [%s]", 
node.getKind());
   }
 
+  /**
+   * Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap 
out any
+   * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link 
org.apache.calcite.sql.SqlLiteral}
+   *  replacement.
+   *
+   * @return a rewritten {@link SqlNode} with any dynamic parameters rewritten 
in the provided {@code original} node,
+   * if they were present.
+   */
+  private SqlNode rewriteParameters(final SqlNode original)
+  {
+    // Parameter replacement is done only if the client provides parameter 
values.
+    // If this is a PREPARE-only, then there will be no values even if the 
statement contains
+    // parameters. If this is a PLAN, then we'll catch later the case that the 
statement
+    // contains parameters, but no values were provided.
+    if (plannerContext.getParameters().isEmpty()) {
+      return original;
+    } else {
+      return original.accept(new SqlParameterizerShuttle(plannerContext)); // 
the rewrite happens here.
+    }
+  }
+
   /**
    * Prepare a SQL query for execution, including some initial parsing and
    * validation and any dynamic parameter type resolution, to support prepared
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
index 92f2ef2ea81..e0b5ffdb08e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
@@ -341,7 +341,6 @@ public abstract class IngestHandler extends QueryHandler
     protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode)
     {
       SqlNode query = convertSourceQuery(sqlNode);
-
       return DruidSqlReplace.create(
           new SqlInsert(
               sqlNode.getParserPosition(),
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
index dfc374f51bb..a915833cd3f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
@@ -113,7 +113,7 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
     CalcitePlanner planner = handlerContext.planner();
     SqlNode validatedQueryNode;
     try {
-      validatedQueryNode = planner.validate(rewriteParameters(root));
+      validatedQueryNode = planner.validate(root);
     }
     catch (ValidationException e) {
       throw DruidPlanner.translateException(e);
@@ -129,24 +129,6 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
     return validatedQueryNode;
   }
 
-  private SqlNode rewriteParameters(SqlNode original)
-  {
-    // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap 
out any
-    // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link 
SqlLiteral}
-    // replacement.
-    //
-    // Parameter replacement is done only if the client provides parameter 
values.
-    // If this is a PREPARE-only, then there will be no values even if the 
statement contains
-    // parameters. If this is a PLAN, then we'll catch later the case that the 
statement
-    // contains parameters, but no values were provided.
-    PlannerContext plannerContext = handlerContext.plannerContext();
-    if (plannerContext.getParameters().isEmpty()) {
-      return original;
-    } else {
-      return original.accept(new SqlParameterizerShuttle(plannerContext));
-    }
-  }
-
   @Override
   public void prepare()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to