[
https://issues.apache.org/jira/browse/FLINK-16936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jeff Zhang updated FLINK-16936:
-------------------------------
Description:
I hit this issue in zeppelin. Let me first describe the thread mode of
zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where
tableenv created, python thread is the python process thread,
python-javagateway-thread is the thread handling request from python
thread(same as pyflink).
Now if I use following table api, I will get the following exception.
{code:java}
st_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, "
"response_size, request_time")\
.group_by("province")\
.select(
"province, count(uuid) as access_count, "
"sum(response_size) as total_download, "
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic") {code}
Errors I get
{code:java}
Py4JJavaError: An error occurred while calling o60.insertInto.
: java.lang.RuntimeException: Error while applying rule
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args
[rel#107:LogicalAggregate.NONE.any.None:
0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error occurred while applying rule
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
... 34 more
Caused by: java.lang.NullPointerException
at scala.Predef$.Double2double(Predef.scala:365)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:78)
at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:936)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:347)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:330)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1828)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1764)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1939)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:129)
... 37 more
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while
calling o60.insertInto.\n', JavaObject id=o61), <traceback object at
0x10fa9efc8>) {code}
But it works for flink sql. After some investigation, I find the root cause is
that in flink sql,
this following code will be called in `SqlToRelConverter.java`
{code}
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
{code}
But in table api, no such code is called. In that case RelMetadataProvider
wont' be set properly if tablenv creation and planner execution in different
thread.
It still works if tableenv creation and planner execution are in the same
thread, because tableenv creation will set RelMetadataProvider properly in
FlinkRelOptClusterFactory
was:
I hit this issue in zeppelin. Let me first describe the thread mode of
zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where
tableenv created, python thread is the python process thread,
python-javagateway-thread is the thread handling request from python
thread(same as pyflink).
Now if I use following table api, I will get the following exception.
{code:java}
st_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, "
"response_size, request_time")\
.group_by("province")\
.select(
"province, count(uuid) as access_count, "
"sum(response_size) as total_download, "
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic") {code}
Errors I get
{code:java}
Py4JJavaError: An error occurred while calling o60.insertInto.
: java.lang.RuntimeException: Error while applying rule
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args
[rel#107:LogicalAggregate.NONE.any.None:
0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error occurred while applying rule
FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
... 34 more
Caused by: java.lang.NullPointerException
at scala.Predef$.Double2double(Predef.scala:365)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:78)
at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:936)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:347)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:330)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1828)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1764)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1939)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:129)
... 37 more
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while
calling o60.insertInto.\n', JavaObject id=o61), <traceback object at
0x10fa9efc8>) {code}
But it works for flink sql. After some investigation, I find the root cause is
that in flink sql,
this following code will be called in `SqlToRelConverter.java`
{code}
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
{code}
But in table api, no such code will be called in table api. In that case
RelMetadataProvider wont' be set properly if tablenv creation and planner
execution in different thread.
It still works if tableenv creation and planner execution are in the same
thread, because tableenv creation will set RelMetadataProvider properly in
FlinkRelOptClusterFactory
> TablEnv creation and planner execution must be in the same thread
> ------------------------------------------------------------------
>
> Key: FLINK-16936
> URL: https://issues.apache.org/jira/browse/FLINK-16936
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Jeff Zhang
> Priority: Major
>
> I hit this issue in zeppelin. Let me first describe the thread mode of
> zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where
> tableenv created, python thread is the python process thread,
> python-javagateway-thread is the thread handling request from python
> thread(same as pyflink).
> Now if I use following table api, I will get the following exception.
> {code:java}
> st_env.from_path("cdn_access_log")\
> .select("uuid, "
> "ip_to_province(client_ip) as province, "
> "response_size, request_time")\
> .group_by("province")\
> .select(
> "province, count(uuid) as access_count, "
> "sum(response_size) as total_download, "
> "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
> .insert_into("cdn_access_statistic") {code}
> Errors I get
> {code:java}
> Py4JJavaError: An error occurred while calling o60.insertInto.
> : java.lang.RuntimeException: Error while applying rule
> FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args
> [rel#107:LogicalAggregate.NONE.any.None:
> 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
> at
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Error occurred while applying rule
> FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
> at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
> at
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
> ... 34 more
> Caused by: java.lang.NullPointerException
> at scala.Predef$.Double2double(Predef.scala:365)
> at
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:78)
> at
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:936)
> at
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:347)
> at
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:330)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1828)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1764)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1939)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:129)
> ... 37 more
> (<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred
> while calling o60.insertInto.\n', JavaObject id=o61), <traceback object at
> 0x10fa9efc8>) {code}
> But it works for flink sql. After some investigation, I find the root cause
> is that in flink sql,
> this following code will be called in `SqlToRelConverter.java`
> {code}
> RelMetadataQuery.THREAD_PROVIDERS.set(
> JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
> {code}
> But in table api, no such code is called. In that case RelMetadataProvider
> wont' be set properly if tablenv creation and planner execution in different
> thread.
> It still works if tableenv creation and planner execution are in the same
> thread, because tableenv creation will set RelMetadataProvider properly in
> FlinkRelOptClusterFactory
--
This message was sent by Atlassian Jira
(v8.3.4#803005)