This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 03f4f797953 [SQL] Support positional parameters (#38880)
03f4f797953 is described below
commit 03f4f797953c676417753a92eadd7191054bc0d8
Author: Yi Hu <[email protected]>
AuthorDate: Thu Jun 11 15:58:37 2026 -0400
[SQL] Support positional parameters (#38880)
---
.../extensions/sql/impl/CalciteQueryPlanner.java | 93 +++++++++++++++++++---
.../{BeamSqlAliasTest => BeamSqlAliasTest.java} | 12 +--
.../extensions/sql/BeamSqlDslParametersTest.java | 78 ++++++++++++++++++
3 files changed, 168 insertions(+), 15 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index 606a3c5f71a..aa6f4d12187 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -51,6 +51,11 @@ import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.Me
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataProvider;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataType;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexBuilder;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexDynamicParam;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexShuttle;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable;
@@ -180,8 +185,8 @@ public class CalciteQueryPlanner implements QueryPlanner {
public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters
queryParameters)
throws ParseException, SqlConversionException {
Preconditions.checkArgument(
- queryParameters.getKind() == Kind.NONE,
- "Beam SQL Calcite dialect does not yet support query parameters.");
+ queryParameters.getKind() == Kind.NONE || queryParameters.getKind() ==
Kind.POSITIONAL,
+ "Beam SQL Calcite dialect only supports positional query parameters.");
BeamRelNode beamRelNode;
try {
SqlNode parsed = planner.parse(sqlStatement);
@@ -191,28 +196,35 @@ public class CalciteQueryPlanner implements QueryPlanner {
// root of original logical plan
RelRoot root = planner.rel(validated);
+ RelNode relNode = root.rel;
+ if (queryParameters.getKind() == Kind.POSITIONAL) {
+ relNode =
+ bindParameters(
+ relNode,
+ new ParameterBinder(root.rel.getCluster().getRexBuilder(),
queryParameters));
+ }
LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
RelTraitSet desiredTraits =
- root.rel
+ relNode
.getTraitSet()
.replace(BeamLogicalConvention.INSTANCE)
.replace(root.collation)
.simplify();
// beam physical plan
- root.rel
+ relNode
.getCluster()
.setMetadataProvider(
ChainedRelMetadataProvider.of(
ImmutableList.of(
NonCumulativeCostImpl.SOURCE,
RelMdNodeStats.SOURCE,
- root.rel.getCluster().getMetadataProvider())));
+ relNode.getCluster().getMetadataProvider())));
-
root.rel.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance);
+
relNode.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance);
RelMetadataQuery.THREAD_PROVIDERS.set(
-
JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
- root.rel.getCluster().invalidateMetadataQuery();
- beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits,
root.rel);
+
JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider()));
+ relNode.getCluster().invalidateMetadataQuery();
+ beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode);
LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode));
} catch (RelConversionException | CannotPlanException e) {
throw new SqlConversionException(
@@ -225,6 +237,15 @@ public class CalciteQueryPlanner implements QueryPlanner {
return beamRelNode;
}
+ private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+ RelNode newRel = rel.accept(binder);
+ java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
+ for (RelNode input : newRel.getInputs()) {
+ newInputs.add(bindParameters(input, binder));
+ }
+ return newRel.copy(newRel.getTraitSet(), newInputs);
+ }
+
// It needs to be public so that the generated code in Calcite can access it.
public static class NonCumulativeCostImpl
implements MetadataHandler<BuiltInMetadata.NonCumulativeCost> {
@@ -265,4 +286,58 @@ public class CalciteQueryPlanner implements QueryPlanner {
return ((BeamRelNode)
rel).beamComputeSelfCost(rel.getCluster().getPlanner(), bmq);
}
}
+
+ private static class ParameterBinder extends RexShuttle {
+ private final RexBuilder rexBuilder;
+ private final List<?> positionalParams;
+
+ ParameterBinder(RexBuilder rexBuilder, QueryParameters params) {
+ this.rexBuilder = rexBuilder;
+ this.positionalParams = params.getKind() == Kind.POSITIONAL ?
params.positional() : null;
+ }
+
+ @Override
+ public RexNode visitDynamicParam(RexDynamicParam dynamicParam) {
+ if (positionalParams != null) {
+ int index = dynamicParam.getIndex();
+ if (index < 0 || index >= positionalParams.size()) {
+ throw new IllegalArgumentException(
+ "Index out of bounds for positional parameter: " + index);
+ }
+ Object val = positionalParams.get(index);
+ return makeLiteral(cleanValue(val), dynamicParam.getType());
+ }
+ return super.visitDynamicParam(dynamicParam);
+ }
+
+ private RexNode makeLiteral(Object val, RelDataType type) {
+ if (val == null) {
+ return rexBuilder.makeNullLiteral(type);
+ }
+ return rexBuilder.makeLiteral(val, type, true);
+ }
+
+ @SuppressWarnings("JavaUtilDate") // explicit java.util.Date support
+ private Object cleanValue(Object value) {
+ if (value instanceof org.joda.time.ReadableInstant) {
+ return ((org.joda.time.ReadableInstant) value).getMillis();
+ }
+ if (value instanceof java.time.LocalDate) {
+ return (int) ((java.time.LocalDate) value).toEpochDay();
+ }
+ if (value instanceof java.time.LocalTime) {
+ return (int) (((java.time.LocalTime) value).toNanoOfDay() /
1_000_000L);
+ }
+ if (value instanceof java.time.LocalDateTime) {
+ return ((java.time.LocalDateTime)
value).toInstant(java.time.ZoneOffset.UTC).toEpochMilli();
+ }
+ if (value instanceof java.sql.Timestamp) {
+ return ((java.sql.Timestamp) value).getTime();
+ }
+ if (value instanceof java.util.Date) {
+ return ((java.util.Date) value).getTime();
+ }
+ return value;
+ }
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java
similarity index 92%
rename from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest
rename to
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java
index 790312b7e75..de3c8e6f301 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.extensions.sql;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
@@ -33,8 +35,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.junit.Rule;
import org.junit.Test;
-import org.testcontainers.shaded.com.fasterxml.jackson.databind.MapperFeature;
-import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
public class BeamSqlAliasTest implements Serializable {
@@ -42,10 +42,10 @@ public class BeamSqlAliasTest implements Serializable {
@Test
public void testSqlWithAliasIsNotIgnoredWithOptimizers() {
- String ID = "id";
- String EVENT = "event";
+ final String id = "id";
+ final String event = "event";
- Schema inputType =
Schema.builder().addStringField(ID).addStringField(EVENT).build();
+ Schema inputType =
Schema.builder().addStringField(id).addStringField(event).build();
String sql =
"select event as event_name, count(*) as c\n" + "from PCOLLECTION\n" +
"group by event";
@@ -91,4 +91,4 @@ public class BeamSqlAliasTest implements Serializable {
pipeline.run().waitUntilFinish();
}
-}
\ No newline at end of file
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java
new file mode 100644
index 00000000000..9166fd16e0a
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import static
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for query parameters in Beam SQL. */
+public class BeamSqlDslParametersTest extends BeamSqlDslBase {
+
+ @Test
+ public void testPositionalParameters() {
+ String sql = "SELECT f_int, f_string FROM PCOLLECTION WHERE f_int = ? AND
f_string = ?";
+
+ PCollection<Row> result =
+ boundedInput1.apply(
+ "testPositionalParameters",
+ SqlTransform.query(sql).withPositionalParameters(Arrays.asList(1,
"string_row1")));
+
+ Row expectedRow =
+
Row.withSchema(Schema.builder().addInt32Field("f_int").addStringField("f_string").build())
+ .addValues(1, "string_row1")
+ .build();
+
+ PAssert.that(result).containsInAnyOrder(expectedRow);
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testDateTimeParameters() {
+ String sql =
+ "SELECT f_int FROM PCOLLECTION WHERE f_date = ? AND f_time = ? AND
f_datetime = ? AND f_timestamp = ?";
+
+ PCollection<Row> result =
+ boundedInput1.apply(
+ "testDateTimeParameters",
+ SqlTransform.query(sql)
+ .withPositionalParameters(
+ Arrays.asList(
+ LocalDate.of(2017, 1, 1),
+ LocalTime.of(1, 1, 3),
+ LocalDateTime.of(2017, 1, 1, 1, 1, 3),
+ new Instant(parseTimestampWithoutTimeZone("2017-01-01
01:01:03")))));
+
+ Row expectedRow =
+
Row.withSchema(Schema.builder().addInt32Field("f_int").build()).addValues(1).build();
+
+ PAssert.that(result).containsInAnyOrder(expectedRow);
+
+ pipeline.run();
+ }
+}