[ 
https://issues.apache.org/jira/browse/BEAM-11872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298452#comment-17298452
 ] 

Andrew Pilloud edited comment on BEAM-11872 at 3/10/21, 12:39 AM:
------------------------------------------------------------------

To implement this we need a "BeamCollectRel", the inverse of this: 
[https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java]

To reproduce, I added the following test to 
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
{code:java}
  @Test
  public void testCollectArrayField() {
    PCollection<Row> input = pCollectionOf2Elements();

    Schema resultType =
        Schema.builder()
            .addArrayField("f_int", Schema.FieldType.INT32)
            .build();

    PCollection<Row> result =
        input.apply("sqlQuery", SqlTransform.query("SELECT ARRAY(SELECT f_int 
FROM PCOLLECTION)"));

    PAssert.that(result)
        
.containsInAnyOrder(Row.withSchema(resultType).addArray(Arrays.asList(1, 
2)).build());

    pipeline.run();
  }
{code}
Error is:
{code:java}
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to 
convert query SELECT ARRAY(SELECT f_int FROM PCOLLECTION)        
        at 
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:197)
                      
        at 
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
                                              
        at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:172)   
                                                
        at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)   
                                                
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)        
                                                           
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)       
                                                           
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)   
                                                           
        at 
org.apache.beam.sdk.extensions.sql.BeamSqlDslArrayTest.testCollectArrayField(BeamSqlDslArrayTest.java:103)
...
Caused by: 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
 There are not enough rules to 
produce a node with desired properties: convention=BEAM_LOGICAL.                
                                                           
Missing conversion is Collect[convention: NONE -> BEAM_LOGICAL]                 
                                                           
There is 1 empty subset: rel#24008:Subset#3.BEAM_LOGICAL, the relevant part of 
the original plan is as follows                             
23957:Collect(field=[EXPR$0])                                                   
                                                           
  23955:LogicalProject(subset=[rel#23956:Subset#2.NONE], f_int=[$0])            
                                                           
    23948:BeamIOSourceRel(subset=[rel#23954:Subset#1.BEAM_LOGICAL], 
table=[[beam, PCOLLECTION]])                                           
...
{code}


was (Author: apilloud):
To impliment this we need a "BeamCollectRel", the inverse of this: 
[https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java]

To reproduce, I added the following test to 
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
{code:java}
  @Test
  public void testCollectArrayField() {
    PCollection<Row> input = pCollectionOf2Elements();

    Schema resultType =
        Schema.builder()
            .addArrayField("f_int", Schema.FieldType.INT32)
            .build();

    PCollection<Row> result =
        input.apply("sqlQuery", SqlTransform.query("SELECT ARRAY(SELECT f_int 
FROM PCOLLECTION)"));

    PAssert.that(result)
        
.containsInAnyOrder(Row.withSchema(resultType).addArray(Arrays.asList(1, 
2)).build());

    pipeline.run();
  }
{code}
Error is:
{code:java}
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to 
convert query SELECT ARRAY(SELECT f_int FROM PCOLLECTION)        
        at 
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:197)
                      
        at 
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
                                              
        at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:172)   
                                                
        at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)   
                                                
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)        
                                                           
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)       
                                                           
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)   
                                                           
        at 
org.apache.beam.sdk.extensions.sql.BeamSqlDslArrayTest.testCollectArrayField(BeamSqlDslArrayTest.java:103)
...
Caused by: 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
 There are not enough rules to 
produce a node with desired properties: convention=BEAM_LOGICAL.                
                                                           
Missing conversion is Collect[convention: NONE -> BEAM_LOGICAL]                 
                                                           
There is 1 empty subset: rel#24008:Subset#3.BEAM_LOGICAL, the relevant part of 
the original plan is as follows                             
23957:Collect(field=[EXPR$0])                                                   
                                                           
  23955:LogicalProject(subset=[rel#23956:Subset#2.NONE], f_int=[$0])            
                                                           
    23948:BeamIOSourceRel(subset=[rel#23954:Subset#1.BEAM_LOGICAL], 
table=[[beam, PCOLLECTION]])                                           
...
{code}

> Support COLLECT operator
> ------------------------
>
>                 Key: BEAM-11872
>                 URL: https://issues.apache.org/jira/browse/BEAM-11872
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Priority: P2
>
> User would like the ARRAY(SELECT ...) operator, which becomes COLLECT.
> Simple example:
> {code:java}
> SELECT ARRAY(SELECT f_int FROM PCOLLECTION)
> {code}
> We also want to make sure the original (more complex) example works:
> {code:java}
> SELECT
>   otherScalarField,
>   ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows)
> FROM
>   table
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to