This is an automated email from the ASF dual-hosted git repository.

apilloud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f66ea2  [BEAM-9658] Plumb through WITH OFFSET
     new 7c22589  Merge pull request #11540 from apilloud/9658
7f66ea2 is described below

commit 7f66ea25417d70dff9982a8b1c88795f30642e73
Author: Andrew Pilloud <[email protected]>
AuthorDate: Thu Apr 23 16:09:30 2020 -0700

    [BEAM-9658] Plumb through WITH OFFSET
---
 .../translation/ArrayScanToJoinConverter.java      | 30 ++++++++++++++++------
 .../translation/ArrayScanToUncollectConverter.java |  9 +++++--
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        | 10 ++++++++
 3 files changed, 39 insertions(+), 10 deletions(-)

diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
index cd7a4fd..d222dac 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
@@ -29,6 +29,7 @@ import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRe
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Uncollect;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalJoin;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalProject;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
@@ -80,14 +81,27 @@ class ArrayScanToJoinConverter extends 
RelConverter<ResolvedArrayScan> {
         LogicalProject.create(createOneRow(getCluster()), projects, 
ImmutableList.of(columnName));
 
     // Create an UnCollect
-    // TODO: how to handle ordinality.
-    Uncollect uncollectNode = Uncollect.create(projectNode.getTraitSet(), 
projectNode, false);
-    // The InputRef should only be 0 because Uncollect has only one field.
-    RelNode rightInput =
-        LogicalProject.create(
-            uncollectNode,
-            
ImmutableList.of(getCluster().getRexBuilder().makeInputRef(uncollectNode, 0)),
-            ImmutableList.of(columnName));
+    boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+
+    // These asserts guaranteed by the parser code, but not the data structure.
+    // If they aren't true we need the Project to reorder columns.
+    assert zetaNode.getElementColumn().getId() == 1;
+    assert !ordinality || zetaNode.getArrayOffsetColumn().getColumn().getId() 
== 2;
+    Uncollect uncollectNode = Uncollect.create(projectNode.getTraitSet(), 
projectNode, ordinality);
+
+    List<RexInputRef> rightProjects = new ArrayList<>();
+    List<String> rightNames = new ArrayList<>();
+    rightProjects.add(getCluster().getRexBuilder().makeInputRef(uncollectNode, 
0));
+    rightNames.add(columnName);
+    if (ordinality) {
+      
rightProjects.add(getCluster().getRexBuilder().makeInputRef(uncollectNode, 1));
+      rightNames.add(
+          String.format(
+              zetaNode.getArrayOffsetColumn().getColumn().getTableName(),
+              zetaNode.getArrayOffsetColumn().getColumn().getName()));
+    }
+
+    RelNode rightInput = LogicalProject.create(uncollectNode, rightProjects, 
rightNames);
 
     // Join condition should be a RexNode converted from join_expr.
     RexNode condition =
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
index 21eee28..87d777ff 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
@@ -54,7 +54,12 @@ class ArrayScanToUncollectConverter extends 
RelConverter<ResolvedArrayScan> {
             Collections.singletonList(arrayLiteralExpression),
             ImmutableList.of(fieldName));
 
-    // TODO: how to handle ordinarily.
-    return Uncollect.create(projectNode.getTraitSet(), projectNode, false);
+    boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+
+    // These asserts guaranteed by the parser code, but not the data structure.
+    // If they aren't true we need to add a Project to reorder columns.
+    assert zetaNode.getElementColumn().getId() == 1;
+    assert !ordinality || zetaNode.getArrayOffsetColumn().getColumn().getId() 
== 2;
+    return Uncollect.create(projectNode.getTraitSet(), projectNode, 
ordinality);
   }
 }
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 6d4eb3e..26e34ca 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -2981,6 +2981,16 @@ public class ZetaSQLDialectSpecTest {
   }
 
   @Test
+  public void testNamedUNNESTLiteralOffset() {
+    String sql = "SELECT x, p FROM UNNEST([3, 4]) AS x WITH OFFSET p";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+    thrown.expect(UnsupportedOperationException.class);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+  }
+
+  @Test
   @Ignore("Seeing exception in Beam, need further investigation on the cause 
of this failed query.")
   public void testNamedUNNESTJoin() {
     String sql =

Reply via email to