[ 
https://issues.apache.org/jira/browse/FLINK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936410#comment-17936410
 ] 

Shengkai Fang edited comment on FLINK-37222 at 3/18/25 8:25 AM:
----------------------------------------------------------------

Based on my analysis of related issues, the core problem appears to be that the 
{{CatalogView}} instance returned by the Catalog loses its temporal attributes 
when accessed through {{{}getUnresolvedSchema(){}}}, necessitating redundant 
SQL re-parsing by CatalogManager to obtain accurate type information for 
subsequent operations.

To address this, our investigation suggests that converting CatalogView to 
QueryOperationCatalogView may be unnecessary and potentially problematic when 
operations span different TableEnvironment instances. Through code analysis, we 
propose implementing a ResolvedCatalogView during the SqlNode-to-Operation 
conversion phase. This approach would prevent CatalogManager from attempting 
view object conversion to QueryOperationCatalogView, thereby maintaining 
temporal attribute integrity throughout the process.


was (Author: fsk119):
After reading all related issues, I think the problem is: The `CatalogView` 
returned by the Catalog loses its time attribute for method 
`getUnresolvedSchema`. So CatalogManager needs to parse the SQL again to get 
the correct type for later usage. 

I think we don't need to resolve the CatalogView to QueryOperationCatalogView 
because the creation and the usage may be across TableEnvironment. By reading 
the codes, I think we can create a ResolvedCatalogView when converting the 
SqlNode to Operation. In this case, the CatalogManager will not try to convert 
the view object to the QueryOperationCatalogView.

> Table planner exception when sql client submit job
> --------------------------------------------------
>
>                 Key: FLINK-37222
>                 URL: https://issues.apache.org/jira/browse/FLINK-37222
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0.0
>            Reporter: Zakelly Lan
>            Assignee: Dawid Wysakowicz
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 2.0.0
>
>         Attachments: flink-root-sql-client-master-1-1.c-76646bdbb8bdab89.log
>
>
> When testing [Nexmark|https://github.com/nexmark/nexmark] on release-2.0 
> branch, a table planner related exception thrown by the sql client:
> {code:java}
> // Omit some stacktraces before ....
> Caused by: 
> org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to 
> execute the operation d3516bc9-44c1-428e-ac10-2fb0ddd0c825.
>       at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:415)
>       at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:268)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       ... 1 more
> Caused by: java.lang.AssertionError: Sql optimization: Assertion error: 
> Relational expression rel#714:LogicalProject.NONE.any.None: 
> 0.[NONE].[NONE](input=LogicalAggregate#712,exprs=[$2, $1]) belongs to a 
> different planner than is currently being used.
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1352)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:930)
>       at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:662)
>       at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:500)
>       at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:262)
>       at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:217)
>       at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:221)
>       at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:120)
>       at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:259)
>       ... 7 more
> Caused by: java.lang.AssertionError: Relational expression 
> rel#714:LogicalProject.NONE.any.None: 
> 0.[NONE].[NONE](input=LogicalAggregate#712,exprs=[$2, $1]) belongs to a 
> different planner than is currently being used.
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1248)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
>       at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:273)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1271)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
>       at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:273)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1271)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:95)
>       at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:273)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1271)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:498)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:314)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>       ... 35 more
> {code}
> I've tested on many commits and identified the commit 
> https://github.com/apache/flink/pull/25952 may cause this. By reverting this 
> commit on release-2.0, the problem solved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to