[
https://issues.apache.org/jira/browse/FLINK-3888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569630#comment-15569630
]
ASF GitHub Bot commented on FLINK-3888:
---------------------------------------
Github user greghogan commented on a diff in the pull request:
https://github.com/apache/flink/pull/2606#discussion_r83069609
--- Diff:
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
---
@@ -1513,14 +1513,21 @@ private void
finalizeWorksetIteration(IterationDescriptor descr) {
String convAggName =
aggs.getConvergenceCriterionAggregatorName();
ConvergenceCriterion<?> convCriterion =
aggs.getConvergenceCriterion();
-
+
if (convCriterion != null || convAggName != null) {
- throw new CompilerException("Error: Cannot use custom
convergence criterion with workset iteration. Workset iterations have implicit
convergence criterion where workset is empty.");
+ if (convCriterion == null) {
+ throw new CompilerException("Error: Convergence
criterion aggregator set, but criterion is null.");
+ }
+ if (convAggName == null) {
+ throw new CompilerException("Error: Aggregator
convergence criterion set, but aggregator is null.");
+ }
+
+ syncConfig.setConvergenceCriterion(convAggName,
convCriterion);
}
headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new LongSumAggregator());
syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new LongSumAggregator());
-
syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new WorksetEmptyConvergenceCriterion());
+
syncConfig.setDefaultConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new WorksetEmptyConvergenceCriterion());
--- End diff --
Is it safe to use a fixed name for the aggregator? Could we not have
multiple iterations in a single job?
> Custom Aggregator with Convergence can't be registered directly with
> DeltaIteration
> -----------------------------------------------------------------------------------
>
> Key: FLINK-3888
> URL: https://issues.apache.org/jira/browse/FLINK-3888
> Project: Flink
> Issue Type: Bug
> Components: Iterations
> Reporter: Martin Liesenberg
> Assignee: Vasia Kalavri
>
> Contrary to the
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html]
> the method to add an aggregator with a custom convergence criterion to a
> DeltaIteration is not exposed directly to DeltaIteration, but can only be
> accessed via the {{aggregatorRegistry}}.
> Moreover, when registering an aggregator with a custom convergence criterion
> and running the program, the following exception appears in the logs:
> {noformat}
> Error: Cannot use custom convergence criterion with workset iteration.
> Workset iterations have implicit convergence criterion where workset is empty.
> org.apache.flink.optimizer.CompilerException: Error: Cannot use custom
> convergence criterion with workset iteration. Workset iterations have
> implicit convergence criterion where workset is empty.
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.finalizeWorksetIteration(JobGraphGenerator.java:1518)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:198)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:164)
> at
> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> {noformat}
> The issue has been found while discussing FLINK-2926
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)