[
https://issues.apache.org/jira/browse/BEAM-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663003#comment-16663003
]
Anton Kedin commented on BEAM-5384:
-----------------------------------
So, the issue seems to happen in cases when there's nothing to project, i.e.
the fields in the query are all the fields in schema. At least those cases
Calcite doesn't generate the Project and our AggregationRule only applies when
there's a projection. It needs projection because it extracts window
information from there and without projection this window information is not
immediately available.
[https://github.com/apache/beam/pull/6816] adds support for aggregations
without projections. It may not be exhaustive solution, as it expects the
TableScan as a child of the Aggregation node. So if there are cases where
Aggregation doesn't have either TableScan or Projection child nodes we still
don't handle them. And without projections we currently don't support windows
in SQL
> [SQL] Calcite optimizes away LogicalProject
> -------------------------------------------
>
> Key: BEAM-5384
> URL: https://issues.apache.org/jira/browse/BEAM-5384
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Reporter: Anton Kedin
> Assignee: Anton Kedin
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> *From
> [https://stackoverflow.com/questions/52313324/beam-sql-wont-work-when-using-aggregation-in-statement-cannot-plan-execution]
> :*
> I have a basic Beam pipeline that reads from GCS, does a Beam SQL transform
> and writes the results to BigQuery.
> When I don't do any aggregation in my SQL statement it works fine:
> {code:java}
> ..
> PCollection<Row> outputStream =
> sqlRows.apply(
> "sql_transform",
> SqlTransform.query("select views from PCOLLECTION"));
> outputStream.setCoder(SCHEMA.getRowCoder());
> ..
> {code}
> However, when I try to aggregate with a sum then it fails (throws a
> CannotPlanException exception):
> {code:java}
> ..
> PCollection<Row> outputStream =
> sqlRows.apply(
> "sql_transform",
> SqlTransform.query("select wikimedia_project,
> sum(views) from PCOLLECTION group by wikimedia_project"));
> outputStream.setCoder(SCHEMA.getRowCoder());
> ..
> {code}
> Stacktrace:
> {code:java}
> Step #1: 11:47:37,562 0 [main] INFO
> org.apache.beam.runners.dataflow.DataflowRunner -
> PipelineOptions.filesToStage was not specified. Defaulting to files from the
> classpath: will stage 117 files. Enable logging at DEBUG level to see which
> files will be staged.
> Step #1: 11:47:39,845 2283 [main] INFO
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQL:
> Step #1: SELECT `PCOLLECTION`.`wikimedia_project`, SUM(`PCOLLECTION`.`views`)
> Step #1: FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
> Step #1: GROUP BY `PCOLLECTION`.`wikimedia_project`
> Step #1: 11:47:40,387 2825 [main] INFO
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQLPlan>
> Step #1: LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
> Step #1: BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> Step #1:
> Step #1: Exception in thread "main"
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
> Node [rel#7:Subset#1.BEAM_LOGICAL.[]] could not be implemented; planner
> state:
> Step #1:
> Step #1: Root: rel#7:Subset#1.BEAM_LOGICAL.[]
> Step #1: Original rel:
> Step #1: LogicalAggregate(subset=[rel#7:Subset#1.BEAM_LOGICAL.[]],
> group=[{0}], EXPR$1=[SUM($1)]): rowcount = 10.0, cumulative cost =
> {11.375000476837158 rows, 0.0 cpu, 0.0 io}, id = 5
> Step #1: BeamIOSourceRel(subset=[rel#4:Subset#0.BEAM_LOGICAL.[]],
> table=[[beam, PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0
> rows, 101.0 cpu, 0.0 io}, id = 2
> Step #1:
> Step #1: Sets:
> Step #1: Set#0, type: RecordType(VARCHAR wikimedia_project, BIGINT views)
> Step #1: rel#4:Subset#0.BEAM_LOGICAL.[], best=rel#2, importance=0.81
> Step #1: rel#2:BeamIOSourceRel.BEAM_LOGICAL.[](table=[beam,
> PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io}
> Step #1: rel#10:Subset#0.ENUMERABLE.[], best=rel#9, importance=0.405
> Step #1:
> rel#9:BeamEnumerableConverter.ENUMERABLE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[]),
> rowcount=100.0, cumulative cost={1.7976931348623157E308 rows,
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> Step #1: Set#1, type: RecordType(VARCHAR wikimedia_project, BIGINT EXPR$1)
> Step #1: rel#6:Subset#1.NONE.[], best=null, importance=0.9
> Step #1:
> rel#5:LogicalAggregate.NONE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[],group={0},EXPR$1=SUM($1)),
> rowcount=10.0, cumulative cost={inf}
> Step #1: rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=1.0
> Step #1:
> rel#8:AbstractConverter.BEAM_LOGICAL.[](input=rel#6:Subset#1.NONE.[],convention=BEAM_LOGICAL,sort=[]),
> rowcount=10.0, cumulative cost={inf}
> Step #1:
> Step #1:
> Step #1: at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:448)
> Step #1: at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:298)
> Step #1: at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:666)
> Step #1: at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> Step #1: at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336)
> Step #1: at
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:138)
> Step #1: at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:105)
> Step #1: at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:96)
> Step #1: at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
> Step #1: at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> Step #1: at
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> Step #1: at
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338)
> Step #1: at org.polleyg.TemplatePipeline.main(TemplatePipeline.java:59)
> Step #1: :run FAILED
> Step #1:
> Step #1: FAILURE: Build failed with an exception.
> {code}
> I'm using Beam 2.6.0
> Am I missing something obvious?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)