Jeff Zhang created FLINK-16936:
----------------------------------
Summary: TablEnv creation and planner execution must be in the
same thread
Key: FLINK-16936
URL: https://issues.apache.org/jira/browse/FLINK-16936
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Jeff Zhang
I hit this issue in zeppelin. Let me first describe the thread mode of
zeppelin. Here's one simple diagram which consists of 3 thread.
scalashell-thread is thread where tableenv created, python thread is the python
process thread, python-javagateway-thread is the thread handling request from
python thread(same as pyflink).
Now if I use following table api, I will get the following exception.
{code:java}
st_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, "
"response_size, request_time")\
.group_by("province")\
.select(
"province, count(uuid) as access_count, "
"sum(response_size) as total_download, "
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic") {code}
Errors I get
{code:java}
Py4JJavaError: An error occurred while calling o60.insertInto.
: java.lang.RuntimeException: Error while applying rule
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args
[rel#107:LogicalAggregate.NONE.any.None:
0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error occurred while applying rule
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
... 34 more
Caused by: java.lang.NullPointerException
at scala.Predef$.Double2double(Predef.scala:365)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:78)
at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:936)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:347)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:330)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1828)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1764)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1939)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:129)
... 37 more
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while
calling o60.insertInto.\n', JavaObject id=o61), <traceback object at
0x10fa9efc8>) {code}
But it works for flink sql. After some investigation, I find the root cause is
that in flink sql,
this following code will be called in `SqlToRelConverter.java`
{code}
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
{code}
But in table api, no such code will be called in table api. In that case
RelMetadataProvider wont' be set properly if tablenv creation and planner
execution in different thread.
It still works if tableenv creation and planner execution are in the same
thread, because tableenv creation will set RelMetadataProvider properly in
FlinkRelOptClusterFactory
--
This message was sent by Atlassian Jira
(v8.3.4#803005)