[ 
https://issues.apache.org/jira/browse/FLINK-15840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17027997#comment-17027997
 ] 

Jingsong Lee edited comment on FLINK-15840 at 2/1/20 6:49 AM:
--------------------------------------------------------------

The problem is that CatalogSourceTable.toRel wants to get the translator of the 
compute column. At present, CatalogSourceTable.toRe has two places to call:
 # The parser stage, which passes the correct compute column translator.
 # In the rule optimization stage, the correct compute column translator is not 
passed in the TableScanRule, so an error is reported.

There are two solutions:
 # Don't use ToRelContext to transfer the translators of compute column. Use 
FlinkContext to transfer so that we can get the correct translators of compute 
columns at any stage.
 # In CatalogSourceTable.toRel, when there is a compute column and the compute 
column translator cannot be obtained, an error is reported, and other cases 
pass normally. The disadvantage of this is that it is currently unable to 
support compute columns on TableApi.

Considering:
 * No plan to support compute column on TableApi.
 * Solution #1 changed too much
 * Solution #1 is also an intermediate version. In the next version, it is 
considered to separate the compute column translation and complete translation 
in the parser stage.

We consider using solution #2.


was (Author: lzljs3620320):
The problem is that CatalogSourceTable.toRel wants to get the translator of the 
compute column. At present, CatalogSourceTable.toRe has two places to call:

1. The parser stage, which passes the correct compute column translator.

2. In the rule optimization stage, the correct compute column translator is not 
passed in the TableScanRule, so an error is reported.

 

There are two solutions:

1. Don't use ToRelContext to transfer the translators of compute column. Use 
FlinkContext to transfer so that we can get the correct translators of compute 
columns at any stage.

2. In CatalogSourceTable.toRel, when there is a compute column and the compute 
column translator cannot be obtained, an error is reported, and other cases 
pass normally. The disadvantage of this is that it is currently unable to 
support compute columns on TableApi.

 

Considering:

- No plan to support compute column on TableApi.

- Solution #1 changed too much

- Solution #1 is also an intermediate version. In the next version, it is 
considered to separate the compute column translation and complete translation 
in the parser stage.

We consider using solution #2.

> ClassCastException is thrown when use tEnv.from for temp/catalog table under 
> Blink planner
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15840
>                 URL: https://issues.apache.org/jira/browse/FLINK-15840
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: sunjincheng
>            Assignee: Jingsong Lee
>            Priority: Blocker
>             Fix For: 1.10.0
>
>
> ClassCastException is thrown when use ConnectorDescriptor under Blink planner.
> The exception can be reproduced by the following test:
> {code:java}
> @Test
> def testDescriptor(): Unit = {
>  this.env = StreamExecutionEnvironment.getExecutionEnvironment
>  val setting = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  this.tEnv = StreamTableEnvironment.create(env, setting)
>  tEnv.connect(new FileSystem().path("/tmp/input"))
>  .withFormat(new OldCsv().field("word", DataTypes.STRING()))
>  .withSchema(new Schema().field("word", DataTypes.STRING()))
>  .createTemporaryTable("sourceTable")
>  val sink = new TestingAppendSink
>  tEnv.from("sourceTable").toAppendStream[Row].addSink(sink)
>  env.execute()
> }
> {code}
> Exceptions:
> {code:java}
> java.lang.ClassCastException: org.apache.calcite.plan.ViewExpanders$2 cannot 
> be cast to org.apache.flink.table.planner.calcite.FlinkToRelContext
>  at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:89)
>  at org.apache.calcite.rel.rules.TableScanRule.onMatch(TableScanRule.java:55)
>  at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>  at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
>  at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>  at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>  at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.immutable.Range.foreach(Range.scala:160)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>  at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
>  at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
>  at 
> org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
>  at 
> org.apache.flink.table.planner.runtime.stream.table.CalcITCase.testDescriptor(CalcITCase.scala:541)
> {code}
> It seems we should not cast `context` to `FlinkToRelContext` directly as it 
> could also be an anonymous classes in `org.apache.calcite.plan.ViewExpanders`.
> What do you think? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to