[ https://issues.apache.org/jira/browse/FLINK-3888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582114#comment-15582114 ]
ASF GitHub Bot commented on FLINK-3888: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/2606#discussion_r83632808 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java --- @@ -1513,14 +1513,21 @@ private void finalizeWorksetIteration(IterationDescriptor descr) { String convAggName = aggs.getConvergenceCriterionAggregatorName(); ConvergenceCriterion<?> convCriterion = aggs.getConvergenceCriterion(); - + if (convCriterion != null || convAggName != null) { - throw new CompilerException("Error: Cannot use custom convergence criterion with workset iteration. Workset iterations have implicit convergence criterion where workset is empty."); + if (convCriterion == null) { + throw new CompilerException("Error: Convergence criterion aggregator set, but criterion is null."); + } + if (convAggName == null) { + throw new CompilerException("Error: Aggregator convergence criterion set, but aggregator is null."); + } + + syncConfig.setConvergenceCriterion(convAggName, convCriterion); } headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator()); syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator()); - syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion()); + syncConfig.setDefaultConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion()); --- End diff -- Sure that's possible, but each iteration will have its own TaskConfig. > Custom Aggregator with Convergence can't be registered directly with > DeltaIteration > ----------------------------------------------------------------------------------- > > Key: FLINK-3888 > URL: https://issues.apache.org/jira/browse/FLINK-3888 > Project: Flink > Issue Type: Bug > Components: Iterations > Reporter: Martin Liesenberg > Assignee: Vasia Kalavri > > Contrary to the > [documentation|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html] > the method to add an aggregator with a custom convergence criterion to a > DeltaIteration is not exposed directly to DeltaIteration, but can only be > accessed via the {{aggregatorRegistry}}. > Moreover, when registering an aggregator with a custom convergence criterion > and running the program, the following exception appears in the logs: > {noformat} > Error: Cannot use custom convergence criterion with workset iteration. > Workset iterations have implicit convergence criterion where workset is empty. > org.apache.flink.optimizer.CompilerException: Error: Cannot use custom > convergence criterion with workset iteration. Workset iterations have > implicit convergence criterion where workset is empty. > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.finalizeWorksetIteration(JobGraphGenerator.java:1518) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:198) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:164) > at > org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > {noformat} > The issue has been found while discussing FLINK-2926 -- This message was sent by Atlassian JIRA (v6.3.4#6332)