[ 
https://issues.apache.org/jira/browse/FLINK-3888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582114#comment-15582114
 ] 

ASF GitHub Bot commented on FLINK-3888:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2606#discussion_r83632808
  
    --- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
    @@ -1513,14 +1513,21 @@ private void 
finalizeWorksetIteration(IterationDescriptor descr) {
                
                String convAggName = 
aggs.getConvergenceCriterionAggregatorName();
                ConvergenceCriterion<?> convCriterion = 
aggs.getConvergenceCriterion();
    -           
    +
                if (convCriterion != null || convAggName != null) {
    -                   throw new CompilerException("Error: Cannot use custom 
convergence criterion with workset iteration. Workset iterations have implicit 
convergence criterion where workset is empty.");
    +                   if (convCriterion == null) {
    +                           throw new CompilerException("Error: Convergence 
criterion aggregator set, but criterion is null.");
    +                   }
    +                   if (convAggName == null) {
    +                           throw new CompilerException("Error: Aggregator 
convergence criterion set, but aggregator is null.");
    +                   }
    +
    +                   syncConfig.setConvergenceCriterion(convAggName, 
convCriterion);
                }
                
                
headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
 new LongSumAggregator());
                
syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
 new LongSumAggregator());
    -           
syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
 new WorksetEmptyConvergenceCriterion());
    +           
syncConfig.setDefaultConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
 new WorksetEmptyConvergenceCriterion());
    --- End diff --
    
    Sure that's possible, but each iteration will have its own TaskConfig.


> Custom Aggregator with Convergence can't be registered directly with 
> DeltaIteration
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-3888
>                 URL: https://issues.apache.org/jira/browse/FLINK-3888
>             Project: Flink
>          Issue Type: Bug
>          Components: Iterations
>            Reporter: Martin Liesenberg
>            Assignee: Vasia Kalavri
>
> Contrary to the 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html]
>  the method to add an aggregator with a custom convergence criterion to a 
> DeltaIteration is not exposed directly to DeltaIteration, but can only be 
> accessed via the {{aggregatorRegistry}}.
> Moreover, when registering an aggregator with a custom convergence criterion  
> and running the program, the following exception appears in the logs:
> {noformat}
> Error: Cannot use custom convergence criterion with workset iteration. 
> Workset iterations have implicit convergence criterion where workset is empty.
> org.apache.flink.optimizer.CompilerException: Error: Cannot use custom 
> convergence criterion with workset iteration. Workset iterations have 
> implicit convergence criterion where workset is empty.
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.finalizeWorksetIteration(JobGraphGenerator.java:1518)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:198)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:164)
>       at 
> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898)
>       at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>       at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> {noformat}
> The issue has been found while discussing FLINK-2926



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to