[ 
https://issues.apache.org/jira/browse/FLINK-3888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569631#comment-15569631
 ] 

ASF GitHub Bot commented on FLINK-3888:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2606#discussion_r83075316
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
 ---
    @@ -137,42 +138,51 @@ public void 
testConnectedComponentsWithParametrizableConvergence() {
                        fail(e.getMessage());
                }
        }
    +
    +   @Test
    +   public void testDeltaConnectedComponentsWithParametrizableConvergence() 
{
    +           try {
    +
    +                   // name of the aggregator that checks for convergence
    +                   final String UPDATED_ELEMENTS = "updated.elements.aggr";
    +
    +                   // the iteration stops if less than this number of 
elements change value
    +                   final long convergence_threshold = 3;
    +
    +                   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +
    +                   DataSet<Tuple2<Long, Long>> initialSolutionSet = 
env.fromCollection(verticesInput);
    +                   DataSet<Tuple2<Long, Long>> edges = 
env.fromCollection(edgesInput);
    +
    +                   DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration =
    +                                   
initialSolutionSet.iterateDelta(initialSolutionSet, 10, 0);
    +
    +                   // register the convergence criterion
    +                   
iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS,
    +                                   new LongSumAggregator(), new 
UpdatedElementsConvergenceCriterion(convergence_threshold));
    +
    +                   DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
iteration.getWorkset().join(edges).where(0).equalTo(0)
    +                                   .with(new NeighborWithComponentIDJoin())
    +                                   .groupBy(0).min(1);
    +
    +                   DataSet<Tuple2<Long, Long>> updatedComponentId =
    +                                   
verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
    +                                                   .flatMap(new 
MinimumIdFilter(UPDATED_ELEMENTS));
    +
    +                   List<Tuple2<Long, Long>> result = 
iteration.closeWith(updatedComponentId, updatedComponentId).collect();
    +                   Collections.sort(result, new 
JavaProgramTestBase.TupleComparator<Tuple2<Long, Long>>());
    +
    +                   assertEquals(expectedResult, result);
    +           }
    +           catch (Exception e) {
    --- End diff --
    
    Catching and re-throwing exceptions looks to be unnecessary.


> Custom Aggregator with Convergence can't be registered directly with 
> DeltaIteration
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-3888
>                 URL: https://issues.apache.org/jira/browse/FLINK-3888
>             Project: Flink
>          Issue Type: Bug
>          Components: Iterations
>            Reporter: Martin Liesenberg
>            Assignee: Vasia Kalavri
>
> Contrary to the 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html]
>  the method to add an aggregator with a custom convergence criterion to a 
> DeltaIteration is not exposed directly to DeltaIteration, but can only be 
> accessed via the {{aggregatorRegistry}}.
> Moreover, when registering an aggregator with a custom convergence criterion  
> and running the program, the following exception appears in the logs:
> {noformat}
> Error: Cannot use custom convergence criterion with workset iteration. 
> Workset iterations have implicit convergence criterion where workset is empty.
> org.apache.flink.optimizer.CompilerException: Error: Cannot use custom 
> convergence criterion with workset iteration. Workset iterations have 
> implicit convergence criterion where workset is empty.
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.finalizeWorksetIteration(JobGraphGenerator.java:1518)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:198)
>       at 
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:164)
>       at 
> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898)
>       at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>       at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> {noformat}
> The issue has been found while discussing FLINK-2926



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to