[FLINK-1959] [runtime] Support accumulators in chained functions after a non-UDF operation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73493335 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73493335 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73493335 Branch: refs/heads/master Commit: 73493335f4dbecbb4f1f9f954b08534a5e35ca90 Parents: cf4f22e Author: Stephan Ewen <se...@apache.org> Authored: Tue May 12 23:00:29 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue May 12 23:00:29 2015 +0200 ---------------------------------------------------------------------- .../common/accumulators/AccumulatorHelper.java | 14 +++--- .../runtime/operators/RegularPactTask.java | 46 ++++++++++++-------- 2 files changed, 37 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java index 9b0e019..3e2e359 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java @@ -40,7 +40,8 @@ public class AccumulatorHelper { if (ownAccumulator == null) { // Take over counter from chained task target.put(otherEntry.getKey(), otherEntry.getValue()); - } else { + } + else { // Both should have the same type AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(), ownAccumulator.getClass(), otherEntry.getValue().getClass()); @@ -122,12 +123,13 @@ public class AccumulatorHelper { return builder.toString(); } - public static void resetAndClearAccumulators( - Map<String, Accumulator<?, ?>> accumulators) { - for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) { - entry.getValue().resetLocal(); + public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>> accumulators) { + if (accumulators != null) { + for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) { + entry.getValue().resetLocal(); + } + accumulators.clear(); } - accumulators.clear(); } public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?, http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index c844d8e..1c3328e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; @@ -70,6 +71,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -508,14 +510,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // JobManager. close() has been called earlier for all involved UDFs // (using this.stub.close() and closeChainedTasks()), so UDFs can no longer // modify accumulators; - if (this.stub != null) { - // collect the counters from the stub - if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) { - Map<String, Accumulator<?, ?>> accumulators = - FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); - RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); - } - } + + // collect the counters from the udf in the core driver + Map<String, Accumulator<?, ?>> accumulators = + FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); + + // collect accumulators from chained tasks and report them + reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); } catch (Exception ex) { // close the input, but do not report any exceptions, since we already have another root cause @@ -572,16 +573,25 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // We can merge here the accumulators from the stub and the chained // tasks. Type conflicts can occur here if counters with same name but // different type were used. - for (ChainedDriver<?, ?> chainedTask : chainedTasks) { - if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) { - Map<String, Accumulator<?, ?>> chainedAccumulators = - FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators(); - AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); + + if (!chainedTasks.isEmpty()) { + if (accumulators == null) { + accumulators = new HashMap<String, Accumulator<?, ?>>(); + } + + for (ChainedDriver<?, ?> chainedTask : chainedTasks) { + RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null); + if (rc != null) { + Map<String, Accumulator<?, ?>> chainedAccumulators = rc.getAllAccumulators(); + if (chainedAccumulators != null) { + AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); + } + } } } // Don't report if the UDF didn't collect any accumulators - if (accumulators.size() == 0) { + if (accumulators == null || accumulators.size() == 0) { return; } @@ -592,9 +602,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // (e.g. in iterations) and we don't want to count twice. This may not be // done before sending AccumulatorHelper.resetAndClearAccumulators(accumulators); + for (ChainedDriver<?, ?> chainedTask : chainedTasks) { - if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) { - AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators()); + RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null); + if (rc != null) { + AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators()); } } } @@ -1140,7 +1152,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i } catch (InterruptedException iex) { throw new RuntimeException("Interrupted while waiting for input " + index + " to become available."); } catch (IOException ioex) { - throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + "."); + throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + "."); } } }