[ 
https://issues.apache.org/jira/browse/FLINK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-19112:
--------------------------------
    Description: 
Under some circumstances, I cannot access {{context.getMetricGroup()}} in a 
{{ScalarFunction}} like this (full job attached):
{code:java}
  public static class MyUDF extends ScalarFunction {
    @Override
    public void open(FunctionContext context) throws Exception {
      super.open(context);
      context.getMetricGroup();
    }

    public Integer eval(Integer id) {
      return id;
    }
  }
{code}
which leads to this exception:
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: 
getMetricGroup is not supported when optimizing
        at 
org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249)
        at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57)
        at ExpressionReducer$2.open(Unknown Source)
        at 
org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118)
        at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696)
        at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618)
        at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
        at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
        at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
        at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50)
{code}
I also tried to work around this with a try-catch, assuming that this method is 
called once during optimisation and another time at runtime. However, it seems 
as if {{open()}} is actually only called once (during optimization) thus giving 
me no choice to access the metrics group.

It seems that removing the where condition before my UDF call also fixes it but 
it shouldn't be that way...

  was:
Under some circumstances, I cannot access {{context.getMetricGroup()}} in a 
{{ScalarFunction}} like this (full job attached):
{code:java}
  public static class MyUDF extends ScalarFunction {
    @Override
    public void open(FunctionContext context) throws Exception {
      super.open(context);
      context.getMetricGroup();
    }

    public Integer eval(Integer id) {
      return id;
    }
  }
{code}
which leads to this exception:
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: 
getMetricGroup is not supported when optimizing
        at 
org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249)
        at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57)
        at ExpressionReducer$2.open(Unknown Source)
        at 
org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118)
        at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696)
        at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618)
        at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
        at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
        at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
        at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50)
{code}
I also tried to work around this with a try-catch, assuming that this method is 
called once during optimisation and another time at runtime. However, it seems 
as if {{open()}} is actually only called once (during optimization) thus giving 
me no choice to access the metrics group.


> No access to metric group in ScalarFunction when optimizing
> -----------------------------------------------------------
>
>                 Key: FLINK-19112
>                 URL: https://issues.apache.org/jira/browse/FLINK-19112
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.1
>            Reporter: Nico Kruber
>            Priority: Major
>         Attachments: MetricsGroupBug.java
>
>
> Under some circumstances, I cannot access {{context.getMetricGroup()}} in a 
> {{ScalarFunction}} like this (full job attached):
> {code:java}
>   public static class MyUDF extends ScalarFunction {
>     @Override
>     public void open(FunctionContext context) throws Exception {
>       super.open(context);
>       context.getMetricGroup();
>     }
>     public Integer eval(Integer id) {
>       return id;
>     }
>   }
> {code}
> which leads to this exception:
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException: 
> getMetricGroup is not supported when optimizing
>       at 
> org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249)
>       at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57)
>       at ExpressionReducer$2.open(Unknown Source)
>       at 
> org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118)
>       at 
> org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696)
>       at 
> org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618)
>       at 
> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303)
>       at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
>       at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
>       at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>       at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>       at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>       at scala.collection.Iterator.foreach(Iterator.scala:937)
>       at scala.collection.Iterator.foreach$(Iterator.scala:937)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>       at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
>       at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
>       at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50)
> {code}
> I also tried to work around this with a try-catch, assuming that this method 
> is called once during optimisation and another time at runtime. However, it 
> seems as if {{open()}} is actually only called once (during optimization) 
> thus giving me no choice to access the metrics group.
> It seems that removing the where condition before my UDF call also fixes it 
> but it shouldn't be that way...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to