Jeff Zhang created FLINK-16936:
----------------------------------

             Summary: TablEnv creation and planner execution must be in the 
same thread 
                 Key: FLINK-16936
                 URL: https://issues.apache.org/jira/browse/FLINK-16936
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.10.0
            Reporter: Jeff Zhang


I hit this issue in zeppelin. Let me first describe the thread mode of 
zeppelin. Here's one simple diagram which consists of 3 thread. 
scalashell-thread is thread where tableenv created, python thread is the python 
process thread, python-javagateway-thread is the thread handling request from 
python thread(same as pyflink).

Now if I use following table api, I will get the following exception. 
{code:java}
st_env.from_path("cdn_access_log")\
   .select("uuid, "
           "ip_to_province(client_ip) as province, " 
           "response_size, request_time")\
   .group_by("province")\
   .select( 
           "province, count(uuid) as access_count, " 
           "sum(response_size) as total_download,  " 
           "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic") {code}
Errors I get
{code:java}
Py4JJavaError: An error occurred while calling o60.insertInto.
: java.lang.RuntimeException: Error while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args 
[rel#107:LogicalAggregate.NONE.any.None: 
0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
  at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
  at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
  at 
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  at py4j.Gateway.invoke(Gateway.java:282)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:238)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
  at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
  at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
  ... 34 more
Caused by: java.lang.NullPointerException
  at scala.Predef$.Double2double(Predef.scala:365)
  at 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:78)
  at 
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
  at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
Source)
  at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
Source)
  at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:936)
  at 
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:347)
  at 
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:330)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1828)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1764)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1939)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:129)
  ... 37 more

(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while 
calling o60.insertInto.\n', JavaObject id=o61), <traceback object at 
0x10fa9efc8>) {code}

But it works for flink sql. After some investigation, I find the root cause is 
that in flink sql, 
this following code will be called in `SqlToRelConverter.java`
{code}
    RelMetadataQuery.THREAD_PROVIDERS.set(
        JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
{code}
But in table api, no such code will be called in table api. In that case 
RelMetadataProvider wont' be set properly if tablenv creation and planner 
execution in different thread.
It still works if tableenv creation and planner execution are in the same 
thread, because tableenv creation will set RelMetadataProvider properly in 
FlinkRelOptClusterFactory




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to