I filed a JIRA for this: https://issues.apache.org/jira/browse/BEAM-12331


On Wed, May 12, 2021 at 12:53 PM Burkay Gur <burkay...@gmail.com> wrote:

> It looks to me like the `matches(RelOptRuleCall x)` function in
> BeamCalcRule is being skipped due to the logic here:
>
>
> https://github.com/apache/beam/pull/11975/files#diff-919be1e4bcc11c17b725cbf04168b583886ffe16286f9291893247954128ad81R43
>
> Below is more detail with the plans with and without the WHERE statement.
>
> *Without the WHERE statement:*
>
> INFO: SQLPlan>
> LogicalProject(item=[$0], purchases=[$2], category=[$1],
> total_purchases=[SUM($2) OVER (PARTITION BY $1 ORDER BY $2 ROWS BETWEEN
> UNBOUNDED PRECEDING AND CURRENT ROW)])
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
> INFO: BEAMPlan>
> BeamCalcRel(expr#0..3=[{inputs}], item=[$t0], purchases=[$t2],
> category=[$t1], total_purchases=[$t3])
>   BeamWindowRel(window#0=[window(partition {1} order by [2] rows between
> UNBOUNDED PRECEDING and CURRENT ROW aggs [SUM($2)])])
>     BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
>
> *With the WHERE statement:*
> SQLPlan>
> LogicalProject(item=[$0], purchases=[$2], category=[$1],
> total_purchases=[SUM($2) OVER (PARTITION BY $1 ORDER BY $2 ROWS BETWEEN
> UNBOUNDED PRECEDING AND CURRENT ROW)])
>   LogicalFilter(condition=[>($2, 3)])
>     BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
> Caused by:
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
> *There are not enough rules to produce a node with desired properties:*
> convention=BEAM_LOGICAL. All the inputs have relevant nodes, however the
> cost is still infinite.
> Root: rel#11:Subset#2.BEAM_LOGICAL
> Original rel:
> LogicalProject(subset=[rel#11:Subset#2.BEAM_LOGICAL], item=[$0],
> purchases=[$2], category=[$1], total_purchases=[SUM($2) OVER (PARTITION BY
> $1 ORDER BY $2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]):
> rowcount = 50.0, cumulative cost = {inf}, id = 9
>   LogicalFilter(subset=[rel#8:Subset#1.NONE], condition=[>($2, 3)]):
> rowcount = 50.0, cumulative cost = {inf}, id = 7
>     BeamIOSourceRel(subset=[rel#6:Subset#0.BEAM_LOGICAL], table=[[beam,
> PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 cpu, 0.0 cpuRate
> }, id = 3
>
> Sets:
> Set#0, type: RecordType(VARCHAR item, VARCHAR category, INTEGER purchases)
> rel#6:Subset#0.BEAM_LOGICAL, best=rel#3, importance=0.7290000000000001
> rel#3:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, PCOLLECTION]),
> rowcount=100.0, cumulative cost={100.0 cpu, 0.0 cpuRate }
> rel#40:Subset#0.ENUMERABLE, best=rel#39, importance=0.36450000000000005
> rel#39:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#6),
> rowcount=100.0, cumulative cost={1.7976931348623157E308 cpu,
> 1.7976931348623157E308 cpuRate }
> Set#1, type: RecordType(VARCHAR item, VARCHAR category, INTEGER purchases)
> rel#8:Subset#1.NONE, best=null, importance=0.81
> rel#7:LogicalFilter.NONE(input=RelSubset#6,condition=>($2, 3)),
> rowcount=50.0, cumulative cost={inf}
> rel#26:LogicalCalc.NONE(input=RelSubset#6,expr#0..2={inputs},expr#3=3,expr#4=>($t2,
> $t3),item=$t0,category=$t1,purchases=$t2,$condition=$t4), rowcount=50.0,
> cumulative cost={inf}
> rel#34:Subset#1.BEAM_LOGICAL, best=rel#33, importance=0.405
> rel#33:BeamCalcRel.BEAM_LOGICAL(input=RelSubset#6,expr#0..2={inputs},expr#3=3,expr#4=>($t2,
> $t3),item=$t0,category=$t1,purchases=$t2,$condition=$t4), rowcount=50.0,
> cumulative cost={200.0 cpu, 0.0 cpuRate }
> rel#36:Subset#1.ENUMERABLE, best=rel#35, importance=0.405
> rel#35:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#34),
> rowcount=50.0, cumulative cost={1.7976931348623157E308 cpu,
> 1.7976931348623157E308 cpuRate }
> Set#2, type: RecordType(VARCHAR item, INTEGER purchases, VARCHAR category,
> INTEGER total_purchases)
> rel#10:Subset#2.NONE, best=null, importance=0.9
> rel#9:LogicalProject.NONE(input=RelSubset#8,item=$0,purchases=$2,category=$1,total_purchases=SUM($2)
> OVER (PARTITION BY $1 ORDER BY $2 ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW)), rowcount=50.0, cumulative cost={inf}
> rel#18:LogicalProject.NONE(input=RelSubset#17,item=$0,purchases=$2,category=$1,total_purchases=$3),
> rowcount=50.0, cumulative cost={inf}
> rel#19:LogicalCalc.NONE(input=RelSubset#17,expr#0..3={inputs},item=$t0,purchases=$t2,category=$t1,total_purchases=$t3),
> rowcount=50.0, cumulative cost={inf}
> rel#25:LogicalCalc.NONE(input=RelSubset#8,expr#0..2={inputs},expr#3=SUM($t2)
> OVER (PARTITION BY $t1 ORDER BY $t2 ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW),item=$t0,purchases=$t2,category=$t1,total_purchases=$t3),
> rowcount=50.0, cumulative cost={inf}
> rel#11:Subset#2.BEAM_LOGICAL, best=null, importance=1.0
> rel#12:AbstractConverter.BEAM_LOGICAL(input=RelSubset#10,convention=BEAM_LOGICAL),
> rowcount=50.0, cumulative cost={inf}
> rel#21:BeamCalcRel.BEAM_LOGICAL(input=RelSubset#20,expr#0..3={inputs},item=$t0,purchases=$t2,category=$t1,total_purchases=$t3),
> rowcount=50.0, cumulative cost={inf}
> rel#24:Subset#2.ENUMERABLE, best=null, importance=0.5
> rel#23:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#11),
> rowcount=50.0, cumulative cost={inf}
> Set#3, type: RecordType(VARCHAR item, VARCHAR category, INTEGER purchases,
> INTEGER w0$o0)
> rel#17:Subset#3.NONE, best=null, importance=0.81
> rel#14:LogicalWindow.NONE(input=RelSubset#8,window#0=window(partition {1}
> order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [SUM($2)])), rowcount=50.0, cumulative cost={inf}
> rel#20:Subset#3.BEAM_LOGICAL, best=null, importance=0.9
> rel#29:BeamWindowRel.BEAM_LOGICAL(input=RelSubset#8,window#0=window(partition
> {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [SUM($2)])), rowcount=50.0, cumulative cost={inf}
> rel#32:Subset#3.ENUMERABLE, best=null, importance=0.45
> rel#31:BeamEnumerableConverter.ENUMERABLE(input=RelSubset#20),
> rowcount=50.0, cumulative cost={inf}
>
> On Wed, May 12, 2021 at 2:27 PM Rui Wang <ruw...@google.com> wrote:
>
>> The exception you shared is truncated so I am not sure the root cause of
>> this exception (it only shows the top which says BeamSql cannot convert the
>> query to a pipeline PTransforms).
>>
>> Assuming you have basic knowledge of BeamSQL and Calcite. If you want to
>> contribute to analytics function, there is a design doc that you can check
>> to understand how query is mapped to Rel nodes:
>> https://docs.google.com/document/d/1tJapdA7ZNwkU0NaK7p-em0XnpHqNE1pKIXw9hVJkIUg/edit?usp=sharing
>>
>> If you don't have basic knowledge of BeamSQL and Calcite, maybe better to
>> start from smaller starter tasks(fix bugs or add operators).
>>
>> -Rui
>>
>> On Wed, May 12, 2021 at 10:30 AM Kyle Weaver <kcwea...@google.com> wrote:
>>
>>> If I'm understanding correctly, this query is taken from an existing
>>> test [1], the only modification is the addition of "WHERE purchases > 3".
>>>
>>> This seems like a bug -- I'm guessing the planner/matching logic doesn't
>>> handle filters properly. To really figure out what's going on, you'd have
>>> to compare the plans for the same query with and without the filter.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/9dbc7b7ceed7b744d90bbf771a9059ea91965353/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java#L89-L95
>>>
>>> On Tue, May 11, 2021 at 3:08 PM Burkay Gur <burkay...@gmail.com> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> When we try to run the following query on BeamSQL:
>>>>
>>>> SELECT item, purchases, category, sum(purchases) over (PARTITION BY
>>>> category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
>>>> ROW) as total_purchases  FROM PCOLLECTION WHERE purchases > 3
>>>>
>>>> We are getting the following error:
>>>>
>>>> Unable to convert query
>>>> org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
>>>> convert query SELECT item, purchases, category, sum(purchases) over
>>>> (PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING
>>>> AND CURRENT ROW) as total_purchases  FROM PCOLLECTION WHERE purchases > 3
>>>> at
>>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:212)
>>>> at
>>>> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:111)
>>>> at
>>>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
>>>> at
>>>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) at
>>>> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499) at
>>>> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370) at
>>>>
>>>> We traced the issue back to this PR:
>>>> https://github.com/apache/beam/pull/11975 specifically this line:
>>>> https://github.com/apache/beam/pull/11975/files#diff-919be1e4bcc11c17b725cbf04168b583886ffe16286f9291893247954128ad81R43
>>>>
>>>> What are the plans on a wider support for analytical functions? If I
>>>> want to contribute, what are the best resources to learn more about how
>>>> BeamSQL / Calcite integration is set up?
>>>>
>>>> Best,
>>>> Burkay
>>>>
>>>>

Reply via email to