[
https://issues.apache.org/jira/browse/BEAM-11872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298452#comment-17298452
]
Andrew Pilloud edited comment on BEAM-11872 at 3/10/21, 12:39 AM:
------------------------------------------------------------------
To implement this we need a "BeamCollectRel", the inverse of this:
[https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java]
To reproduce, I added the following test to
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
{code:java}
@Test
public void testCollectArrayField() {
PCollection<Row> input = pCollectionOf2Elements();
Schema resultType =
Schema.builder()
.addArrayField("f_int", Schema.FieldType.INT32)
.build();
PCollection<Row> result =
input.apply("sqlQuery", SqlTransform.query("SELECT ARRAY(SELECT f_int
FROM PCOLLECTION)"));
PAssert.that(result)
.containsInAnyOrder(Row.withSchema(resultType).addArray(Arrays.asList(1,
2)).build());
pipeline.run();
}
{code}
Error is:
{code:java}
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
convert query SELECT ARRAY(SELECT f_int FROM PCOLLECTION)
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:197)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:172)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at
org.apache.beam.sdk.extensions.sql.BeamSqlDslArrayTest.testCollectArrayField(BeamSqlDslArrayTest.java:103)
...
Caused by:
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
There are not enough rules to
produce a node with desired properties: convention=BEAM_LOGICAL.
Missing conversion is Collect[convention: NONE -> BEAM_LOGICAL]
There is 1 empty subset: rel#24008:Subset#3.BEAM_LOGICAL, the relevant part of
the original plan is as follows
23957:Collect(field=[EXPR$0])
23955:LogicalProject(subset=[rel#23956:Subset#2.NONE], f_int=[$0])
23948:BeamIOSourceRel(subset=[rel#23954:Subset#1.BEAM_LOGICAL],
table=[[beam, PCOLLECTION]])
...
{code}
was (Author: apilloud):
To impliment this we need a "BeamCollectRel", the inverse of this:
[https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java]
To reproduce, I added the following test to
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
{code:java}
@Test
public void testCollectArrayField() {
PCollection<Row> input = pCollectionOf2Elements();
Schema resultType =
Schema.builder()
.addArrayField("f_int", Schema.FieldType.INT32)
.build();
PCollection<Row> result =
input.apply("sqlQuery", SqlTransform.query("SELECT ARRAY(SELECT f_int
FROM PCOLLECTION)"));
PAssert.that(result)
.containsInAnyOrder(Row.withSchema(resultType).addArray(Arrays.asList(1,
2)).build());
pipeline.run();
}
{code}
Error is:
{code:java}
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
convert query SELECT ARRAY(SELECT f_int FROM PCOLLECTION)
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:197)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:172)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at
org.apache.beam.sdk.extensions.sql.BeamSqlDslArrayTest.testCollectArrayField(BeamSqlDslArrayTest.java:103)
...
Caused by:
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
There are not enough rules to
produce a node with desired properties: convention=BEAM_LOGICAL.
Missing conversion is Collect[convention: NONE -> BEAM_LOGICAL]
There is 1 empty subset: rel#24008:Subset#3.BEAM_LOGICAL, the relevant part of
the original plan is as follows
23957:Collect(field=[EXPR$0])
23955:LogicalProject(subset=[rel#23956:Subset#2.NONE], f_int=[$0])
23948:BeamIOSourceRel(subset=[rel#23954:Subset#1.BEAM_LOGICAL],
table=[[beam, PCOLLECTION]])
...
{code}
> Support COLLECT operator
> ------------------------
>
> Key: BEAM-11872
> URL: https://issues.apache.org/jira/browse/BEAM-11872
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Andrew Pilloud
> Priority: P2
>
> User would like the ARRAY(SELECT ...) operator, which becomes COLLECT.
> Simple example:
> {code:java}
> SELECT ARRAY(SELECT f_int FROM PCOLLECTION)
> {code}
> We also want to make sure the original (more complex) example works:
> {code:java}
> SELECT
> otherScalarField,
> ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows)
> FROM
> table
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)