[
https://issues.apache.org/jira/browse/FLINK-3888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569631#comment-15569631
]
ASF GitHub Bot commented on FLINK-3888:
---------------------------------------
Github user greghogan commented on a diff in the pull request:
https://github.com/apache/flink/pull/2606#discussion_r83075316
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
---
@@ -137,42 +138,51 @@ public void
testConnectedComponentsWithParametrizableConvergence() {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testDeltaConnectedComponentsWithParametrizableConvergence()
{
+ try {
+
+ // name of the aggregator that checks for convergence
+ final String UPDATED_ELEMENTS = "updated.elements.aggr";
+
+ // the iteration stops if less than this number of
elements change value
+ final long convergence_threshold = 3;
+
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> initialSolutionSet =
env.fromCollection(verticesInput);
+ DataSet<Tuple2<Long, Long>> edges =
env.fromCollection(edgesInput);
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>>
iteration =
+
initialSolutionSet.iterateDelta(initialSolutionSet, 10, 0);
+
+ // register the convergence criterion
+
iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS,
+ new LongSumAggregator(), new
UpdatedElementsConvergenceCriterion(convergence_threshold));
+
+ DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
iteration.getWorkset().join(edges).where(0).equalTo(0)
+ .with(new NeighborWithComponentIDJoin())
+ .groupBy(0).min(1);
+
+ DataSet<Tuple2<Long, Long>> updatedComponentId =
+
verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
+ .flatMap(new
MinimumIdFilter(UPDATED_ELEMENTS));
+
+ List<Tuple2<Long, Long>> result =
iteration.closeWith(updatedComponentId, updatedComponentId).collect();
+ Collections.sort(result, new
JavaProgramTestBase.TupleComparator<Tuple2<Long, Long>>());
+
+ assertEquals(expectedResult, result);
+ }
+ catch (Exception e) {
--- End diff --
Catching and re-throwing exceptions looks to be unnecessary.
> Custom Aggregator with Convergence can't be registered directly with
> DeltaIteration
> -----------------------------------------------------------------------------------
>
> Key: FLINK-3888
> URL: https://issues.apache.org/jira/browse/FLINK-3888
> Project: Flink
> Issue Type: Bug
> Components: Iterations
> Reporter: Martin Liesenberg
> Assignee: Vasia Kalavri
>
> Contrary to the
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html]
> the method to add an aggregator with a custom convergence criterion to a
> DeltaIteration is not exposed directly to DeltaIteration, but can only be
> accessed via the {{aggregatorRegistry}}.
> Moreover, when registering an aggregator with a custom convergence criterion
> and running the program, the following exception appears in the logs:
> {noformat}
> Error: Cannot use custom convergence criterion with workset iteration.
> Workset iterations have implicit convergence criterion where workset is empty.
> org.apache.flink.optimizer.CompilerException: Error: Cannot use custom
> convergence criterion with workset iteration. Workset iterations have
> implicit convergence criterion where workset is empty.
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.finalizeWorksetIteration(JobGraphGenerator.java:1518)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:198)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:164)
> at
> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> {noformat}
> The issue has been found while discussing FLINK-2926
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)