[
https://issues.apache.org/jira/browse/FLINK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17189137#comment-17189137
]
Nico Kruber commented on FLINK-19112:
-------------------------------------
A mock MetricGroup has the disadvantage that the user doesn't immediately see
that their optimised function doesn't actually produce any metrics
anymore...but accompanied with a WARN in the logs, this should be fine as the
function may have been optimised in some incarnations but not in others and
both should just work.
Catching the exception and then not optimising sounds compelling at first, but
I'm not sure it helps much in the general case: you'd at least have to clean up
after the failed {{open()}} call, but then we don't actually know which state
the UDF object is in. This may also hide other errors that were not expected /
should be fixed (both in user code and framework code).
> No access to metric group in ScalarFunction when optimizing
> -----------------------------------------------------------
>
> Key: FLINK-19112
> URL: https://issues.apache.org/jira/browse/FLINK-19112
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.1
> Reporter: Nico Kruber
> Assignee: Timo Walther
> Priority: Major
> Attachments: MetricsGroupBug.java
>
>
> Under some circumstances, I cannot access {{context.getMetricGroup()}} in a
> {{ScalarFunction}} like this (full job attached):
> {code:java}
> public static class MyUDF extends ScalarFunction {
> @Override
> public void open(FunctionContext context) throws Exception {
> super.open(context);
> context.getMetricGroup();
> }
> public Integer eval(Integer id) {
> return id;
> }
> }
> {code}
> which leads to this exception:
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException:
> getMetricGroup is not supported when optimizing
> at
> org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249)
> at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57)
> at ExpressionReducer$2.open(Unknown Source)
> at
> org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118)
> at
> org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696)
> at
> org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618)
> at
> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
> at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50)
> {code}
> I also tried to work around this with a try-catch, assuming that this method
> is called once during optimisation and another time at runtime. However, it
> seems as if {{open()}} is actually only called once (during optimization)
> thus giving me no choice to access the metrics group.
> It seems that removing the where condition before my UDF call also fixes it
> but it shouldn't be that way...
--
This message was sent by Atlassian Jira
(v8.3.4#803005)