This is an automated email from the ASF dual-hosted git repository.
apilloud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7f66ea2 [BEAM-9658] Plumb through WITH OFFSET
new 7c22589 Merge pull request #11540 from apilloud/9658
7f66ea2 is described below
commit 7f66ea25417d70dff9982a8b1c88795f30642e73
Author: Andrew Pilloud <[email protected]>
AuthorDate: Thu Apr 23 16:09:30 2020 -0700
[BEAM-9658] Plumb through WITH OFFSET
---
.../translation/ArrayScanToJoinConverter.java | 30 ++++++++++++++++------
.../translation/ArrayScanToUncollectConverter.java | 9 +++++--
.../sql/zetasql/ZetaSQLDialectSpecTest.java | 10 ++++++++
3 files changed, 39 insertions(+), 10 deletions(-)
diff --git
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
index cd7a4fd..d222dac 100644
---
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
+++
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
@@ -29,6 +29,7 @@ import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRe
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Uncollect;
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalJoin;
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalProject;
+import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
@@ -80,14 +81,27 @@ class ArrayScanToJoinConverter extends
RelConverter<ResolvedArrayScan> {
LogicalProject.create(createOneRow(getCluster()), projects,
ImmutableList.of(columnName));
// Create an UnCollect
- // TODO: how to handle ordinality.
- Uncollect uncollectNode = Uncollect.create(projectNode.getTraitSet(),
projectNode, false);
- // The InputRef should only be 0 because Uncollect has only one field.
- RelNode rightInput =
- LogicalProject.create(
- uncollectNode,
-
ImmutableList.of(getCluster().getRexBuilder().makeInputRef(uncollectNode, 0)),
- ImmutableList.of(columnName));
+ boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+
+ // These asserts guaranteed by the parser code, but not the data structure.
+ // If they aren't true we need the Project to reorder columns.
+ assert zetaNode.getElementColumn().getId() == 1;
+ assert !ordinality || zetaNode.getArrayOffsetColumn().getColumn().getId()
== 2;
+ Uncollect uncollectNode = Uncollect.create(projectNode.getTraitSet(),
projectNode, ordinality);
+
+ List<RexInputRef> rightProjects = new ArrayList<>();
+ List<String> rightNames = new ArrayList<>();
+ rightProjects.add(getCluster().getRexBuilder().makeInputRef(uncollectNode,
0));
+ rightNames.add(columnName);
+ if (ordinality) {
+
rightProjects.add(getCluster().getRexBuilder().makeInputRef(uncollectNode, 1));
+ rightNames.add(
+ String.format(
+ zetaNode.getArrayOffsetColumn().getColumn().getTableName(),
+ zetaNode.getArrayOffsetColumn().getColumn().getName()));
+ }
+
+ RelNode rightInput = LogicalProject.create(uncollectNode, rightProjects,
rightNames);
// Join condition should be a RexNode converted from join_expr.
RexNode condition =
diff --git
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
index 21eee28..87d777ff 100644
---
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
+++
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
@@ -54,7 +54,12 @@ class ArrayScanToUncollectConverter extends
RelConverter<ResolvedArrayScan> {
Collections.singletonList(arrayLiteralExpression),
ImmutableList.of(fieldName));
- // TODO: how to handle ordinarily.
- return Uncollect.create(projectNode.getTraitSet(), projectNode, false);
+ boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+
+ // These asserts guaranteed by the parser code, but not the data structure.
+ // If they aren't true we need to add a Project to reorder columns.
+ assert zetaNode.getElementColumn().getId() == 1;
+ assert !ordinality || zetaNode.getArrayOffsetColumn().getColumn().getId()
== 2;
+ return Uncollect.create(projectNode.getTraitSet(), projectNode,
ordinality);
}
}
diff --git
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 6d4eb3e..26e34ca 100644
---
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -2981,6 +2981,16 @@ public class ZetaSQLDialectSpecTest {
}
@Test
+ public void testNamedUNNESTLiteralOffset() {
+ String sql = "SELECT x, p FROM UNNEST([3, 4]) AS x WITH OFFSET p";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+ thrown.expect(UnsupportedOperationException.class);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+ }
+
+ @Test
@Ignore("Seeing exception in Beam, need further investigation on the cause
of this failed query.")
public void testNamedUNNESTJoin() {
String sql =