Hi Talat,

I managed to turn your test case into something against Calcite. It
looks like there is a bug affecting tables that contain one or more
single element structs and no multi element structs. I've sent the
details to the Calcite mailing list here.
https://lists.apache.org/thread/tlr9hsmx09by79h91nwp2d4nv8jfwsto

I'm experimenting with ideas on how to work around this but a fix will
likely require a Calcite upgrade, which is not something I'd have time
to help with. (I'm not on the Google Beam team anymore.)

Andrew

On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer
<tuya...@paloaltonetworks.com> wrote:
>
> Hi @Andrew Pilloud
>
> Sorry for the late response. Yes your test is working fine. I changed the 
> test input structure like our input structure. Now this test also has the 
> same exception.
>
> Feb 21, 2023 2:02:28 PM 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> INFO: SQL:
> WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`, 
> `panwRowTestTable`.`id`, `panwRowTestTable`.`value`
> FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
> WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT 
> `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
> FROM `tempTable` AS `tempTable`)
> Feb 21, 2023 2:02:28 PM 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> INFO: SQLPlan>
> LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
>   LogicalFilter(condition=[=($0.name, 'innerStr')])
>     LogicalProject(name=[$0.name], id=[$1], value=[$2])
>       BeamIOSourceRel(table=[[beam, panwRowTestTable]])
>
>
> fieldList must not be null, type = VARCHAR
> java.lang.AssertionError: fieldList must not be null, type = VARCHAR
>
> I dont know what is different from yours. I am sharing my version of the test 
> also.
>
>
> Index: 
> sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> ===================================================================
> diff --git 
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> --- 
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  (revision fd383fae1adc545b6b6a22b274902cda956fec49)
> +++ 
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>  (date 1677017032324)
> @@ -54,6 +54,9 @@
>    private static final Schema innerRowSchema =
>        
> Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
>
> +  private static final Schema innerPanwRowSchema =
> +          Schema.builder().addStringField("name").build();
> +
>    private static final Schema innerRowWithArraySchema =
>        Schema.builder()
>            .addStringField("string_field")
> @@ -127,8 +130,12 @@
>                                .build()))
>                .put(
>                    "basicRowTestTable",
> -                  TestBoundedTable.of(FieldType.row(innerRowSchema), "col")
> -                      
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> +                  TestBoundedTable.of(FieldType.row(innerRowSchema), "col", 
> FieldType.INT64, "field")
> +                      
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), 
> 1L))
> +                .put(
> +                  "panwRowTestTable",
> +                  TestBoundedTable.of(FieldType.row(innerPanwRowSchema), 
> "user_info", FieldType.INT64, "id", FieldType.STRING, "value")
> +                          
> .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L, 
> "some_value"))
>                .put(
>                    "rowWithArrayTestTable",
>                    TestBoundedTable.of(FieldType.row(rowWithArraySchema), 
> "col")
> @@ -219,6 +226,21 @@
>                  .build());
>      pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
>    }
> +
> +  @Test
> +  public void testBasicRowWhereField() {
> +    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> +    PCollection<Row> stream =
> +        BeamSqlRelUtils.toPCollection(
> +            pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM 
> panwRowTestTable WHERE panwRowTestTable.`user_info`.`name` = 'innerStr') 
> SELECT * FROM tempTable"));
> +    Schema outputSchema = Schema.builder().addRowField("col", 
> innerRowSchema).addInt64Field("field").build();
> +    PAssert.that(stream)
> +        .containsInAnyOrder(
> +            Row.withSchema(outputSchema)
> +                .addValues(Row.withSchema(innerRowSchema).addValues("name", 
> 1L).build(), 1L)
> +                .build());
> +    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> +  }
>
>    @Test
>    public void testArrayConstructor() {
>
>
>
>
> On Fri, Feb 10, 2023 at 6:14 PM Andrew Pilloud <apill...@google.com> wrote:
>>
>> I have a test case that I believe should reproduce this on both head and 
>> 2.43 but it ends up with a different logical plan. Can you provide your 
>> input types?
>>
>> We have a class of issues around compex types 
>> https://github.com/apache/beam/issues/19009 I don't believe the 
>> "LogicalFilter(condition=[=($2.name, 'User1')])" particularly "$2.name" is 
>> something that works, in my test it seems that the planner has flattened the 
>> complex input and reproduced a ROW at the output.
>>
>>     INFO: SQLPlan>
>>     LogicalProject(col=[ROW($0, $1)], field=[$2])
>>       LogicalFilter(condition=[=($0, 'innerStr')])
>>         LogicalProject(string_field=[$0.string_field], 
>> long_field=[$0.long_field], field=[$1])
>>           BeamIOSourceRel(table=[[beam, basicRowTestTable]])
>>
>>     Feb 10, 2023 6:07:35 PM 
>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
>>     INFO: BEAMPlan>
>>     BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t0.string_field], 
>> expr#3=[$t0.long_field], expr#4=[ROW($t2, $t3)], 
>> expr#5=['innerStr':VARCHAR], expr#6=[=($t2, $t5)], col=[$t4], field=[$t1], 
>> $condition=[$t6])
>>       BeamIOSourceRel(table=[[beam, basicRowTestTable]])
>>
>> --- 
>> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>> +++ 
>> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>> @@ -127,8 +127,8 @@ public class BeamComplexTypeTest {
>>                                .build()))
>>                .put(
>>                    "basicRowTestTable",
>> -                  TestBoundedTable.of(FieldType.row(innerRowSchema), "col")
>> -                      
>> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
>> +                  TestBoundedTable.of(FieldType.row(innerRowSchema), "col", 
>> FieldType.INT64, "field")
>> +                      
>> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), 
>> 1L))
>>                .put(
>>                    "rowWithArrayTestTable",
>>                    TestBoundedTable.of(FieldType.row(rowWithArraySchema), 
>> "col")
>> @@ -220,6 +220,21 @@ public class BeamComplexTypeTest {
>>      pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
>>    }
>>
>> +  @Test
>> +  public void testBasicRowWhereField() {
>> +    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
>> +    PCollection<Row> stream =
>> +        BeamSqlRelUtils.toPCollection(
>> +            pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM 
>> basicRowTestTable WHERE basicRowTestTable.col.string_field = 'innerStr') 
>> SELECT * FROM tempTable"));
>> +    Schema outputSchema = Schema.builder().addRowField("col", 
>> innerRowSchema).addInt64Field("field").build();
>> +    PAssert.that(stream)
>> +        .containsInAnyOrder(
>> +            Row.withSchema(outputSchema)
>> +                
>> .addValues(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), 
>> 1L)
>> +                .build());
>> +    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
>> +  }
>> +
>>    @Test
>>    public void testArrayConstructor() {
>>      BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
>>
>>
>> On Fri, Feb 3, 2023 at 2:06 PM Talat Uyarer <tuya...@paloaltonetworks.com> 
>> wrote:
>>>
>>> Hi Andrew,
>>>
>>> Thank you for your MR. I am parricated to help us to solve the issue. I 
>>> rerun our tests and they are partially passing now with your fix.  However, 
>>> there is one more issue with the WITH clause.
>>>
>>> When i run following query somehow beam lost type of column
>>>
>>> WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE 
>>> PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable
>>>
>>> I havent test on Beam Master. I run with your latest patch on our code 
>>> base. This is the output
>>>
>>> 14:00:30.095 [Test worker] INFO  
>>> o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
>>> WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`, 
>>> `PCOLLECTION`.`user_info`
>>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
>>> WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT `tempTable`.`id`, 
>>> `tempTable`.`value`, `tempTable`.`user_info`
>>> FROM `tempTable` AS `tempTable`)
>>> 14:00:30.106 [Test worker] DEBUG 
>>> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting 
>>> SqlNode to RelNode
>>> LogicalProject(id=[$0], value=[$1], user_info=[$2])
>>>   LogicalFilter(condition=[=($2.name, 'User1')])
>>>     BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>
>>> 14:00:30.107 [Test worker] DEBUG 
>>> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting 
>>> SqlNode to RelNode
>>> LogicalProject(id=[$0], value=[$1], user_info=[$2])
>>>   LogicalFilter(condition=[=($2.name, 'User1')])
>>>     BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>
>>> 14:00:30.109 [Test worker] INFO  
>>> o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQLPlan>
>>> LogicalProject(id=[$0], value=[$1], user_info=[ROW($2)])
>>>   LogicalFilter(condition=[=($2.name, 'User1')])
>>>     LogicalProject(id=[$0], value=[$1], name=[$2.name])
>>>       BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>
>>> 14:00:30.173 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER = 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
>>>  COST = {inf}
>>> 14:00:30.173 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule 
>>> [BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] rels [#27]
>>> 14:00:30.173 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118: Apply rule 
>>> [BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] to 
>>> [rel#27:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, PCOLLECTION])]
>>> 14:00:30.174 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#41 
>>> via BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)
>>> 14:00:30.175 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118 generated 1 
>>> successors: 
>>> [rel#41:BeamEnumerableConverter.ENUMERABLE(input=BeamIOSourceRel#27)]
>>> 14:00:30.175 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER = 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
>>>  COST = {inf}
>>> 14:00:30.175 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule 
>>> [ProjectToCalcRule] rels [#33]
>>> 14:00:30.175 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136: Apply rule 
>>> [ProjectToCalcRule] to 
>>> [rel#33:LogicalProject.NONE(input=RelSubset#32,inputs=0..1,exprs=[$2.name])]
>>> 14:00:30.177 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#44 
>>> via ProjectToCalcRule
>>> 14:00:30.178 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136 generated 1 
>>> successors: 
>>> [rel#44:LogicalCalc.NONE(input=RelSubset#32,expr#0..2={inputs},expr#3=$t2.name,proj#0..1={exprs},2=$t3)]
>>> 14:00:30.178 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER = 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081;
>>>  COST = {inf}
>>> 14:00:30.178 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule 
>>> [FilterToCalcRule] rels [#35]
>>> 14:00:30.178 [Test worker] DEBUG 
>>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#160: Apply rule 
>>> [FilterToCalcRule] to 
>>> [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition==($2.name, 
>>> 'User1'))]
>>>
>>> fieldList must not be null, type = VARCHAR
>>> java.lang.AssertionError: fieldList must not be null, type = VARCHAR
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelDataTypeImpl.getFieldList(RelDataTypeImpl.java:164)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.checkValid(RexFieldAccess.java:76)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.<init>(RexFieldAccess.java:64)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:208)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:911)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:894)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:94)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:161)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:113)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:896)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:894)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall.accept(RexCall.java:189)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:302)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder.addCondition(RexProgramBuilder.java:277)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.rules.FilterToCalcRule.onMatch(FilterToCalcRule.java:76)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:239)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:61)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
>>> at 
>>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:373)
>>> at 
>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:211)
>>> at 
>>> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:112)
>>> at 
>>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
>>> at 
>>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:110)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:373)
>>> at 
>>> com.paloaltonetworks.cortex.streamcompute.filter.Filter.expand(Filter.java:126)
>>> at 
>>> com.paloaltonetworks.cortex.streamcompute.filter.Filter.expand(Filter.java:49)
>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:373)
>>> at 
>>> com.paloaltonetworks.cortex.streamcompute.filter.WithClauseFilterComplexBulkTest.testIt(WithClauseFilterComplexBulkTest.java:149)
>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>>> Method)
>>> at 
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at 
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at 
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>> at 
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at 
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>> at 
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at 
>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at 
>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>> at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>> at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>> at org.junit.runners.Suite.runChild(Suite.java:128)
>>> at org.junit.runners.Suite.runChild(Suite.java:27)
>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>> at 
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>> at 
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>> at 
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>> at 
>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>> at 
>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>>> Method)
>>> at 
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at 
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at 
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>> at 
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>> at 
>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>> at 
>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>> at com.sun.proxy.$Proxy5.processTestClass(Unknown Source)
>>> at 
>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>>> Method)
>>> at 
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at 
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at 
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>> at 
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>> at 
>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>> at 
>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>> at 
>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>> at 
>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>> at 
>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at 
>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Feb 2, 2023 at 1:06 PM Andrew Pilloud <apill...@google.com> wrote:
>>>>
>>>> It looks like Calcite stopped considering field names in RelNode equality 
>>>> as of Calcite 2.22 (which we use in Beam v2.34.0+). This can result in a 
>>>> planner state where two nodes that only differ by field name are 
>>>> considered equivalent.
>>>>
>>>> I have a fix for Beam in https://github.com/apache/beam/pull/25290 and 
>>>> I'll send an email to the Calcite dev list with more details.
>>>>
>>>> Andrew
>>>>
>>>> On Fri, Jan 27, 2023 at 11:33 AM Andrew Pilloud <apill...@google.com> 
>>>> wrote:
>>>>>
>>>>> Also this is at very least a Beam bug. You can file a Beam issue if you 
>>>>> want, otherwise I will when I get back.
>>>>>
>>>>> Andrew
>>>>>
>>>>> On Fri, Jan 27, 2023 at 11:27 AM Andrew Pilloud <apill...@google.com> 
>>>>> wrote:
>>>>>>
>>>>>> Hi Talat,
>>>>>>
>>>>>> I did get your test case running and added some logging to 
>>>>>> RexProgramBuilder.mergePrograms. There is only one merge that occurs 
>>>>>> during the test and it has an output type of RecordType(JavaType(int) 
>>>>>> ID, JavaType(class java.lang.String) V). This does seem like the correct 
>>>>>> output name but it doesn't match the final output name, so something is 
>>>>>> still different than the Beam test case. I also modified mergePrograms 
>>>>>> to purposely corrupt the output names, that did not cause the test to 
>>>>>> fail or trip the 'assert mergedProg.getOutputRowType() == 
>>>>>> topProgram.getOutputRowType();' in mergePrograms. I could not find any 
>>>>>> Calcite unit tests for RexProgramBuilder.mergePrograms or 
>>>>>> CoreRules.CALC_MERGE rule so I think it is still probable that the 
>>>>>> problem is in this area.
>>>>>>
>>>>>> One minor issue I encountered. It took me a while to get your test case 
>>>>>> running, it doesn't appear there are any calcite gradle rules to run 
>>>>>> CoreQuidemTest and constructing the classpath manually was tedious. Did 
>>>>>> I miss something?
>>>>>>
>>>>>> I'm still working on this but I'm out today and Monday, it will probably 
>>>>>> be Wednesday before I make any more progress.
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>> On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer 
>>>>>> <tuya...@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>> Hi Andrew,
>>>>>>>
>>>>>>> Yes This aligned also with my debugging. In My Kenn's reply you can see 
>>>>>>> a sql test which I wrote in Calcite. Somehow Calcite does not have this 
>>>>>>> issue with the 1.28 version.
>>>>>>>
>>>>>>> !use post
>>>>>>> !set outputformat mysql
>>>>>>>
>>>>>>> #Test aliases with with clause
>>>>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>>>>>>> "hr"."emps"."name" as v from "hr"."emps")
>>>>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>>>>>>> tempTable.v <> '11' ;
>>>>>>> +-----+-----------+
>>>>>>> | ID  | value     |
>>>>>>> +-----+-----------+
>>>>>>> | 100 | Bill      |
>>>>>>> | 110 | Theodore  |
>>>>>>> | 150 | Sebastian |
>>>>>>> | 200 | Eric      |
>>>>>>> +-----+-----------+
>>>>>>> (4 rows)
>>>>>>>
>>>>>>> !ok
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud <apill...@google.com> 
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Yes, that worked.
>>>>>>>>
>>>>>>>> The issue does not occur if I disable all of the following planner 
>>>>>>>> rules: CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE, 
>>>>>>>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE), and 
>>>>>>>> BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>>>>>>>>
>>>>>>>> All the rules share a common call to RexProgramBuilder.mergePrograms, 
>>>>>>>> so I suspect the problem lies there. I spent some time looking but 
>>>>>>>> wasn't able to find it by code inspection, it looks like this code 
>>>>>>>> path is doing the right thing with names. I'll spend some time 
>>>>>>>> tomorrow trying to reproduce this on pure Calcite.
>>>>>>>>
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer 
>>>>>>>> <tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi Andrew,
>>>>>>>>>
>>>>>>>>> Thanks for writing a test for this use case. Without Where clause it 
>>>>>>>>> works as expected on our test cases also too. Please add where clause 
>>>>>>>>> on second select. With the below query it does not return column 
>>>>>>>>> names. I tested on my local also.
>>>>>>>>>
>>>>>>>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM 
>>>>>>>>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable 
>>>>>>>>> WHERE id > 1
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud <apill...@google.com> 
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> +dev@beam.apache.org
>>>>>>>>>>
>>>>>>>>>> I tried reproducing this but was not successful, the output schema 
>>>>>>>>>> was as expected. I added the following to 
>>>>>>>>>> BeamSqlMultipleSchemasTest.java at head. (I did discover that  
>>>>>>>>>> PAssert.that(result).containsInAnyOrder(output) doesn't validate 
>>>>>>>>>> column names however.)
>>>>>>>>>>
>>>>>>>>>>   @Test
>>>>>>>>>>   public void testSelectAs() {
>>>>>>>>>>     PCollection<Row> input = pipeline.apply(create(row(1, 
>>>>>>>>>> "strstr")));
>>>>>>>>>>
>>>>>>>>>>     PCollection<Row> result =
>>>>>>>>>>         input.apply(SqlTransform.query("WITH tempTable (id, v) AS 
>>>>>>>>>> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS 
>>>>>>>>>> fout_int, v AS fout_string FROM tempTable"));
>>>>>>>>>>
>>>>>>>>>>     Schema output_schema =
>>>>>>>>>>       
>>>>>>>>>> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
>>>>>>>>>>     assertThat(result.getSchema(), equalTo(output_schema));
>>>>>>>>>>
>>>>>>>>>>     Row output = Row.withSchema(output_schema).addValues(1, 
>>>>>>>>>> "strstr").build();
>>>>>>>>>>     PAssert.that(result).containsInAnyOrder(output);
>>>>>>>>>>     pipeline.run();
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer 
>>>>>>>>>> <tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Kenn,
>>>>>>>>>>>
>>>>>>>>>>> Thank you for replying back to my email.
>>>>>>>>>>>
>>>>>>>>>>> I was under the same impression about Calcite. But I wrote a test 
>>>>>>>>>>> on Calcite 1.28 too. It is working without issue that I see on BEAM
>>>>>>>>>>>
>>>>>>>>>>> Here is my test case. If you want you can also run on Calcite. 
>>>>>>>>>>> Please put under core/src/test/resources/sql as text file. and Run 
>>>>>>>>>>> CoreQuidemTest class.
>>>>>>>>>>>
>>>>>>>>>>> !use post
>>>>>>>>>>> !set outputformat mysql
>>>>>>>>>>>
>>>>>>>>>>> #Test aliases with with clause
>>>>>>>>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>>>>>>>>>>> "hr"."emps"."name" as v from "hr"."emps")
>>>>>>>>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable 
>>>>>>>>>>> WHERE tempTable.v <> '11' ;
>>>>>>>>>>> +-----+-----------+
>>>>>>>>>>> | ID  | value     |
>>>>>>>>>>> +-----+-----------+
>>>>>>>>>>> | 100 | Bill      |
>>>>>>>>>>> | 110 | Theodore  |
>>>>>>>>>>> | 150 | Sebastian |
>>>>>>>>>>> | 200 | Eric      |
>>>>>>>>>>> +-----+-----------+
>>>>>>>>>>> (4 rows)
>>>>>>>>>>>
>>>>>>>>>>> !ok
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles <k...@apache.org> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Looking at the code that turns a logical CalcRel into a 
>>>>>>>>>>>> BeamCalcRel I do not see any obvious cause for this: 
>>>>>>>>>>>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
>>>>>>>>>>>>
>>>>>>>>>>>> I don't like to guess that upstream libraries have the bug, but in 
>>>>>>>>>>>> this case I wonder if the alias is lost in the Calcite optimizer 
>>>>>>>>>>>> rule for merging the projects and filters into a Calc.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles <k...@apache.org> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am not sure I understand the question, but I do see an issue.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Context: "CalcRel" is an optimized relational operation that is 
>>>>>>>>>>>>> somewhat like ParDo, with a small snippet of a single-assignment 
>>>>>>>>>>>>> DSL embedded in it. Calcite will choose to merge all the projects 
>>>>>>>>>>>>> and filters into the node, and then generates Java bytecode to 
>>>>>>>>>>>>> directly execute the DSL.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Problem: it looks like the CalcRel has output columns with 
>>>>>>>>>>>>> aliases "id" and "v" where it should have output columns with 
>>>>>>>>>>>>> aliases "id" and "value".
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay <al...@google.com> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Adding: @Andrew Pilloud @Kenneth Knowles
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user 
>>>>>>>>>>>>>> <u...@beam.apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am using Beam 2.43 with Calcite SQL with Java.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a query with a WITH clause and some aliasing. Looks like 
>>>>>>>>>>>>>>> Beam Query optimizer after optimizing my query, it drops Select 
>>>>>>>>>>>>>>> statement's aliases. Can you help me to identify where the 
>>>>>>>>>>>>>>> problem is ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is my query
>>>>>>>>>>>>>>> INFO: SQL:
>>>>>>>>>>>>>>> WITH `tempTable` (`id`, `v`) AS (SELECT 
>>>>>>>>>>>>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`, 
>>>>>>>>>>>>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
>>>>>>>>>>>>>>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT 
>>>>>>>>>>>>>>> `tempTable`.`id` AS `id`, `tempTable`.`v` AS `value`
>>>>>>>>>>>>>>> FROM `tempTable` AS `tempTable`
>>>>>>>>>>>>>>> WHERE `tempTable`.`v` <> '11')
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is Calcite Plan look at LogicalProject(id=[$0], 
>>>>>>>>>>>>>>> value=[$1]) in SQL plan.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jan 12, 2023 12:19:08 PM 
>>>>>>>>>>>>>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner 
>>>>>>>>>>>>>>> convertToBeamRel
>>>>>>>>>>>>>>> INFO: SQLPlan>
>>>>>>>>>>>>>>> LogicalProject(id=[$0], value=[$1])
>>>>>>>>>>>>>>>   LogicalFilter(condition=[<>($1, '11')])
>>>>>>>>>>>>>>>     LogicalProject(id=[$1.f_nestedInt], v=[$1.f_nestedString])
>>>>>>>>>>>>>>>       BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But Beam Plan does not have a LogicalProject(id=[$0], 
>>>>>>>>>>>>>>> value=[$1]) or similar.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jan 12, 2023 12:19:08 PM 
>>>>>>>>>>>>>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner 
>>>>>>>>>>>>>>> convertToBeamRel
>>>>>>>>>>>>>>> INFO: BEAMPlan>
>>>>>>>>>>>>>>> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t1.f_nestedInt], 
>>>>>>>>>>>>>>> expr#3=[$t1.f_nestedString], expr#4=['11':VARCHAR], 
>>>>>>>>>>>>>>> expr#5=[<>($t3, $t4)], id=[$t2], v=[$t3], $condition=[$t5])
>>>>>>>>>>>>>>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks

Reply via email to