[ https://issues.apache.org/jira/browse/FLINK-25084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jing Zhang closed FLINK-25084. ------------------------------ Fix Version/s: 1.15.0 1.14.1 1.13.4 Resolution: Fixed > Field names must be unique. Found duplicates > -------------------------------------------- > > Key: FLINK-25084 > URL: https://issues.apache.org/jira/browse/FLINK-25084 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.13.2 > Environment: AWS Kinesis Application in Zeppelin > Apache Flink 1.13, Apache Zeppelin 0.9 > > Reporter: Ivan Budanaev > Priority: Major > Fix For: 1.15.0, 1.14.1, 1.13.4 > > Attachments: Screenshot 2021-11-28 at 13.10.57.png > > > I am getting a "Field names must be unique. Found duplicates" error when > trying to aggregate a column used as a descriptor in HOP windowing. > Imagine this example, with *events_table* reading from kinesis stream, the > definition given below, I am getting the "Field names must be unique. Found > duplicates: [ts]" when trying to run the following SQL in Kinesis Data > Analytics Application in Zeppelin: > {code:sql} > %flink.ssql(type=update) > -- insert into learn_actions_deduped > SELECT window_start, window_end, uuid, event_type, max(ts) as max_event_ts > FROM TABLE(HOP(TABLE events_table, DESCRIPTOR(ts), INTERVAL '5' SECONDS, > INTERVAL '15' MINUTES)) > GROUP BY window_start, window_end, uuid, event_type; > {code} > The question is how can I use the descriptor column in aggregation without > having to duplicate it? > The error details: > java.io.IOException: Fail to run stream sql job > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105) > at > org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:503) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:266) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112) > at > org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) > at > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) > at > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) > at > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.RuntimeException: Error while applying rule > PullUpWindowTableFunctionIntoWindowAggregateRule, args > [rel#1172:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#1170,groupBy=uuid, > event_type,window=HOP(win_start=[window_start], win_end=[window_end], > size=[15 min], slide=[5 s]),select=uuid, event_type, MAX(ts) AS max_event_ts, > start('w$) AS window_start, end('w$) AS window_end), > rel#1179:StreamPhysicalExchange.STREAM_PHYSICAL.hash[2, 3]true.None: > 0.[NONE].[NONE](input=RelSubset#1169,distribution=hash[uuid, event_type]), > rel#1168:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#1167,select=window_start, window_end, uuid, > event_type, CAST(ts) AS ts), > rel#1166:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#1165,window=HOP(time_col=[ts], size=[15 min], > slide=[5 s]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161) > ... 16 more > Caused by: java.lang.RuntimeException: Error occurred while applying rule > PullUpWindowTableFunctionIntoWindowAggregateRule > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) > at > org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) > ... 42 more > Caused by: org.apache.flink.table.api.ValidationException: Field names must > be unique. Found duplicates: [ts] > at > org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) > at org.apache.flink.table.types.logical.RowType.(RowType.java:157) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:297) > at org.apache.flink.table.types.logical.RowType.of(RowType.java:289) > at > org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86) > at > org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409) > at > org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) > at > org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443) > at java.base/java.util.HashMap.hash(HashMap.java:339) > at java.base/java.util.HashMap.get(HashMap.java:552) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) > ... 46 more > {code:sql} > CREATE TABLE events_table ( > uuid varchar(36), > event_type VARCHAR(20), > ts TIMESTAMP(3), > WATERMARK FOR ts AS ts - INTERVAL '5' SECOND > ) > PARTITIONED BY (event_type) > WITH ( > 'connector' = 'kinesis', > 'stream' = 'kinesis-event-stream', > 'aws.region' = 'us-west-2', > 'scan.stream.initpos' = 'TRIM_HORIZON', > 'format' = 'json', > 'scan.stream.recordpublisher' = 'EFO', > 'scan.stream.efo.consumername' = 'learn-actions-efo', > 'scan.stream.efo.registration' = 'LAZY', -- EAGER > 'json.timestamp-format.standard' = 'ISO-8601' > ); > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)