[ https://issues.apache.org/jira/browse/FLINK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936410#comment-17936410 ]
Shengkai Fang edited comment on FLINK-37222 at 3/18/25 8:25 AM: ---------------------------------------------------------------- Based on my analysis of related issues, the core problem appears to be that the {{CatalogView}} instance returned by the Catalog loses its temporal attributes when accessed through {{{}getUnresolvedSchema(){}}}, necessitating redundant SQL re-parsing by CatalogManager to obtain accurate type information for subsequent operations. To address this, our investigation suggests that converting CatalogView to QueryOperationCatalogView may be unnecessary and potentially problematic when operations span different TableEnvironment instances. Through code analysis, we propose implementing a ResolvedCatalogView during the SqlNode-to-Operation conversion phase. This approach would prevent CatalogManager from attempting view object conversion to QueryOperationCatalogView, thereby maintaining temporal attribute integrity throughout the process. was (Author: fsk119): After reading all related issues, I think the problem is: The `CatalogView` returned by the Catalog loses its time attribute for method `getUnresolvedSchema`. So CatalogManager needs to parse the SQL again to get the correct type for later usage. I think we don't need to resolve the CatalogView to QueryOperationCatalogView because the creation and the usage may be across TableEnvironment. By reading the codes, I think we can create a ResolvedCatalogView when converting the SqlNode to Operation. In this case, the CatalogManager will not try to convert the view object to the QueryOperationCatalogView. > Table planner exception when sql client submit job > -------------------------------------------------- > > Key: FLINK-37222 > URL: https://issues.apache.org/jira/browse/FLINK-37222 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 2.0.0 > Reporter: Zakelly Lan > Assignee: Dawid Wysakowicz > Priority: Blocker > Labels: pull-request-available > Fix For: 2.0.0 > > Attachments: flink-root-sql-client-master-1-1.c-76646bdbb8bdab89.log > > > When testing [Nexmark|https://github.com/nexmark/nexmark] on release-2.0 > branch, a table planner related exception thrown by the sql client: > {code:java} > // Omit some stacktraces before .... > Caused by: > org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to > execute the operation d3516bc9-44c1-428e-ac10-2fb0ddd0c825. > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:415) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:268) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > Caused by: java.lang.AssertionError: Sql optimization: Assertion error: > Relational expression rel#714:LogicalProject.NONE.any.None: > 0.[NONE].[NONE](input=LogicalAggregate#712,exprs=[$2, $1]) belongs to a > different planner than is currently being used. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1352) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:930) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:662) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:500) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:262) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:217) > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:221) > at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:120) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:259) > ... 7 more > Caused by: java.lang.AssertionError: Relational expression > rel#714:LogicalProject.NONE.any.None: > 0.[NONE].[NONE](input=LogicalAggregate#712,exprs=[$2, $1]) belongs to a > different planner than is currently being used. > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1248) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:273) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1271) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:273) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1271) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:273) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1271) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:498) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:314) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > ... 35 more > {code} > I've tested on many commits and identified the commit > https://github.com/apache/flink/pull/25952 may cause this. By reverting this > commit on release-2.0, the problem solved. -- This message was sent by Atlassian Jira (v8.20.10#820010)