This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 37a2a12d79b rerwrite node so dynamic parameter applies to ingest node
as well. (#17126)
37a2a12d79b is described below
commit 37a2a12d79bb56b16d45484ac9923eed8da5b934
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Sep 23 12:49:46 2024 -0700
rerwrite node so dynamic parameter applies to ingest node as well. (#17126)
---
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 67 ++++++++++++++++++++++
.../org/apache/druid/msq/exec/MSQSelectTest.java | 49 ++++++++++++++++
.../org/apache/druid/msq/test/MSQTestBase.java | 21 ++++---
.../druid/sql/calcite/planner/DruidPlanner.java | 24 +++++++-
.../druid/sql/calcite/planner/IngestHandler.java | 1 -
.../druid/sql/calcite/planner/QueryHandler.java | 20 +------
6 files changed, 154 insertions(+), 28 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 2ae3eaf0383..8f8a0b569c9 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -704,6 +706,71 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults();
}
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testReplaceWithDynamicParameters(String contextName, Map<String,
Object> context)
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("m1", ColumnType.FLOAT)
+ .build();
+
+ testIngestQuery().setSql(
+ " REPLACE INTO foo OVERWRITE WHERE __time >= ? AND
__time < ? "
+ + "SELECT __time, m1 "
+ + "FROM foo "
+ + "WHERE __time >= ? AND __time < ? "
+ + "PARTITIONED by DAY ")
+ .setDynamicParameters(ImmutableList.of(
+
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP,
DateTimes.of("2000-01-02").getMillis()),
+
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP,
DateTimes.of("2000-01-03").getMillis()),
+
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP,
DateTimes.of("2000-01-02").getMillis()),
+
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP,
DateTimes.of("2000-01-03").getMillis())
+ ))
+ .setExpectedDataSource("foo")
+
.setExpectedDestinationIntervals(ImmutableList.of(Intervals.of(
+ "2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")))
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+ .setExpectedSegments(ImmutableSet.of(SegmentId.of(
+ "foo",
+ Intervals.of("2000-01-02T/P1D"),
+ "test",
+ 0
+ )))
+ .setExpectedResultRows(ImmutableList.of(new
Object[]{946771200000L, 2.0f}))
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().totalFiles(1),
+ 0, 0, "input0"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(1).frames(1),
+ 0, 0, "shuffle"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(1).frames(1),
+ 1, 0, "input0"
+ )
+
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+ CounterSnapshotMatcher
+ .with().segmentRowsProcessed(1),
+ 1, 0
+ )
+ .setExpectedLastCompactionState(
+ expectedCompactionState(
+ context,
+ Collections.emptyList(),
+ Collections.singletonList(new
FloatDimensionSchema("m1")),
+ GranularityType.DAY,
+ Intervals.of("2000-01-02T/P1D")
+ )
+ )
+ .verifyResults();
+ }
+
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFoo1WithAllExtern(String contextName, Map<String,
Object> context) throws IOException
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index a5822f3a0b7..91a1983bfd6 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
@@ -409,6 +411,53 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testSelectWithDynamicParameters(String contextName, Map<String,
Object> context)
+ {
+ RowSignature resultSignature = RowSignature.builder()
+ .add("cnt", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .build();
+
+ // Filter [__time >= timestamp '3000-01-01 00:00:00'] matches no segments
at all.
+ testSelectQuery()
+ .setSql("select cnt,dim1 from foo where __time >= ?")
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(
+ newScanQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(
+ querySegmentSpec(
+ Intervals.utc(
+ DateTimes.of("3000").getMillis(),
+ Intervals.ETERNITY.getEndMillis()
+ )
+ )
+ )
+ .columns("cnt", "dim1")
+ .context(defaultScanQueryContext(context,
resultSignature))
+ .build()
+ )
+ .columnMappings(ColumnMappings.identity(resultSignature))
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(isDurableStorageDestination(contextName,
context)
+ ? DurableStorageMSQDestination.INSTANCE
+ : TaskReportMSQDestination.INSTANCE)
+ .build()
+ )
+ .setDynamicParameters(
+ ImmutableList.of(
+ TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP,
DateTimes.of("3000-01-01").getMillis())
+ )
+ )
+ .setQueryContext(context)
+ .setExpectedRowSignature(resultSignature)
+ .setExpectedResultRows(ImmutableList.of())
+ .verifyResults();
+ }
+
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectOnFooWhereMatchesNoData(String contextName,
Map<String, Object> context)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index e1ce49d8292..761a61337ea 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -36,6 +36,7 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
+import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
@@ -217,7 +218,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -784,13 +784,13 @@ public class MSQTestBase extends BaseCalciteQueryTest
);
}
- private String runMultiStageQuery(String query, Map<String, Object> context)
+ private String runMultiStageQuery(String query, Map<String, Object> context,
List<TypedValue> parameters)
{
final DirectStatement stmt = sqlStatementFactory.directStatement(
new SqlQueryPlus(
query,
context,
- Collections.emptyList(),
+ parameters,
CalciteTests.REGULAR_USER_AUTH_RESULT
)
);
@@ -886,6 +886,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected String sql = null;
protected MSQControllerTask taskSpec = null;
protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
+ protected List<TypedValue> dynamicParameters = new ArrayList<>();
protected List<MSQResultsReport.ColumnAndType> expectedRowSignature = null;
protected MSQSpec expectedMSQSpec = null;
protected MSQTuningConfig expectedTuningConfig = null;
@@ -924,6 +925,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
+ public Builder setDynamicParameters(List<TypedValue> dynamicParameters)
+ {
+ this.dynamicParameters = dynamicParameters;
+ return asBuilder();
+ }
+
public Builder
setExpectedRowSignature(List<MSQResultsReport.ColumnAndType>
expectedRowSignature)
{
Preconditions.checkArgument(!expectedRowSignature.isEmpty(), "Row
signature cannot be empty");
@@ -1057,7 +1064,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
final Throwable e = Assert.assertThrows(
Throwable.class,
- () -> runMultiStageQuery(sql, queryContext)
+ () -> runMultiStageQuery(sql, queryContext, dynamicParameters)
);
assertThat(e, expectedValidationErrorMatcher);
@@ -1209,7 +1216,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
String controllerId;
if (sql != null) {
// Run the sql command.
- controllerId = runMultiStageQuery(sql, queryContext);
+ controllerId = runMultiStageQuery(sql, queryContext,
dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
@@ -1426,7 +1433,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
try {
String controllerId;
if (sql != null) {
- controllerId = runMultiStageQuery(sql, queryContext);
+ controllerId = runMultiStageQuery(sql, queryContext,
dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
@@ -1468,7 +1475,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
Preconditions.checkArgument(sql == null || queryContext != null,
"queryContext cannot be null");
try {
- String controllerId = runMultiStageQuery(sql, queryContext);
+ String controllerId = runMultiStageQuery(sql, queryContext,
dynamicParameters);
if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index cf1d22eb39b..03ef94656c5 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -64,7 +64,6 @@ import java.util.function.Function;
*/
public class DruidPlanner implements Closeable
{
-
public static final Joiner SPACE_JOINER = Joiner.on(" ");
public static final Joiner COMMA_JOINER = Joiner.on(", ");
@@ -148,6 +147,7 @@ public class DruidPlanner implements Closeable
catch (SqlParseException e1) {
throw translateException(e1);
}
+ root = rewriteParameters(root);
hook.captureSqlNode(root);
handler = createHandler(root);
handler.validate();
@@ -158,6 +158,7 @@ public class DruidPlanner implements Closeable
private SqlStatementHandler createHandler(final SqlNode node)
{
SqlNode query = node;
+
SqlExplain explain = null;
if (query.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) query;
@@ -179,6 +180,27 @@ public class DruidPlanner implements Closeable
throw InvalidSqlInput.exception("Unsupported SQL statement [%s]",
node.getKind());
}
+ /**
+ * Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap
out any
+ * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link
org.apache.calcite.sql.SqlLiteral}
+ * replacement.
+ *
+ * @return a rewritten {@link SqlNode} with any dynamic parameters rewritten
in the provided {@code original} node,
+ * if they were present.
+ */
+ private SqlNode rewriteParameters(final SqlNode original)
+ {
+ // Parameter replacement is done only if the client provides parameter
values.
+ // If this is a PREPARE-only, then there will be no values even if the
statement contains
+ // parameters. If this is a PLAN, then we'll catch later the case that the
statement
+ // contains parameters, but no values were provided.
+ if (plannerContext.getParameters().isEmpty()) {
+ return original;
+ } else {
+ return original.accept(new SqlParameterizerShuttle(plannerContext)); //
the rewrite happens here.
+ }
+ }
+
/**
* Prepare a SQL query for execution, including some initial parsing and
* validation and any dynamic parameter type resolution, to support prepared
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
index 92f2ef2ea81..e0b5ffdb08e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
@@ -341,7 +341,6 @@ public abstract class IngestHandler extends QueryHandler
protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode)
{
SqlNode query = convertSourceQuery(sqlNode);
-
return DruidSqlReplace.create(
new SqlInsert(
sqlNode.getParserPosition(),
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
index dfc374f51bb..a915833cd3f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
@@ -113,7 +113,7 @@ public abstract class QueryHandler extends
SqlStatementHandler.BaseStatementHand
CalcitePlanner planner = handlerContext.planner();
SqlNode validatedQueryNode;
try {
- validatedQueryNode = planner.validate(rewriteParameters(root));
+ validatedQueryNode = planner.validate(root);
}
catch (ValidationException e) {
throw DruidPlanner.translateException(e);
@@ -129,24 +129,6 @@ public abstract class QueryHandler extends
SqlStatementHandler.BaseStatementHand
return validatedQueryNode;
}
- private SqlNode rewriteParameters(SqlNode original)
- {
- // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap
out any
- // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link
SqlLiteral}
- // replacement.
- //
- // Parameter replacement is done only if the client provides parameter
values.
- // If this is a PREPARE-only, then there will be no values even if the
statement contains
- // parameters. If this is a PLAN, then we'll catch later the case that the
statement
- // contains parameters, but no values were provided.
- PlannerContext plannerContext = handlerContext.plannerContext();
- if (plannerContext.getParameters().isEmpty()) {
- return original;
- } else {
- return original.accept(new SqlParameterizerShuttle(plannerContext));
- }
- }
-
@Override
public void prepare()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]