[
https://issues.apache.org/jira/browse/FLINK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sujun closed FLINK-20660.
-------------------------
Resolution: Not A Bug
> Time window operator with computed column triggers an exception in batch mode
> -----------------------------------------------------------------------------
>
> Key: FLINK-20660
> URL: https://issues.apache.org/jira/browse/FLINK-20660
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.0, 1.12.0
> Reporter: sujun
> Priority: Minor
>
> {{Time window operator with computed column triggers an exception in batch
> mode, it may be a bug in BatchExecWindowAggregateRule.}}
> {{My test code:}}
> {code:java}
> public class WindowAggWithBigintTest {
> public static void main(String[] args) throws Exception {
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> tEnv.registerFunction("longToTimestamp",new LongToTimestamp());
>
> String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS
> longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' =
> 'orc', 'path' = '/path/to/orc')";
> tEnv.executeSql(ddl);
> Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime,
> INTERVAL '1' HOUR) as ts,count(1) as ct from source group by TUMBLE(rowtime,
> INTERVAL '1' HOUR)");
> DiscardingOutputFormat<String> outputFormat = new
> DiscardingOutputFormat();
> TableResultSink tableResultSink = new
> TableResultSink(table.getSchema(), outputFormat);
> tEnv.registerTableSink("sink",tableResultSink);
> table.insertInto("sink");
> tEnv.execute("test");
> }
> private static class TableResultSink implements StreamTableSink<String>
> {
> private final TableSchema schema;
> private final DataType rowType;
> private final OutputFormat<String> outputFormat;
>
> TableResultSink(TableSchema schema, OutputFormat<String>
> outputFormat) {
> this.schema = schema;
> this.rowType = schema.toRowDataType();
> this.outputFormat = outputFormat;
> }
> @Override
> public DataType getConsumedDataType() {
> return rowType;
> }
> @Override
> public TableSchema getTableSchema() {
> return schema;
> }
> @Override
> public TableSink<String> configure(String[] fieldNames,
> TypeInformation<?>[] fieldTypes) {
> throw new UnsupportedOperationException(
> "This sink is configured by passing a static
> schema when initiating");
> }
> @Override
> public DataStreamSink<?> consumeDataStream(DataStream<String>
> dataStream) {
> return
> dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
> }
> }
> }
> {code}
>
> Exception:
>
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying
> rule BatchExecWindowAggregateRule, args
> [rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$,
> $f0, 3600000),properties=w$start, w$end, w$rowtime),
> rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
> default_database, source, source: [FileSystemTableSource(occur_time,
> rowtime)]],fields=occur_time, rowtime)]
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> at example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34)
> Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input
> fields are: [occur_time]
> at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> at
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> at
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> at
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> ... 27 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)