[FLINK-1959] [runtime] Support accumulators in chained functions after a 
non-UDF operation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73493335
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73493335
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73493335

Branch: refs/heads/master
Commit: 73493335f4dbecbb4f1f9f954b08534a5e35ca90
Parents: cf4f22e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 12 23:00:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 23:00:29 2015 +0200

----------------------------------------------------------------------
 .../common/accumulators/AccumulatorHelper.java  | 14 +++---
 .../runtime/operators/RegularPactTask.java      | 46 ++++++++++++--------
 2 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 9b0e019..3e2e359 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -40,7 +40,8 @@ public class AccumulatorHelper {
                        if (ownAccumulator == null) {
                                // Take over counter from chained task
                                target.put(otherEntry.getKey(), 
otherEntry.getValue());
-                       } else {
+                       }
+                       else {
                                // Both should have the same type
                                
AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
                                                ownAccumulator.getClass(), 
otherEntry.getValue().getClass());
@@ -122,12 +123,13 @@ public class AccumulatorHelper {
                return builder.toString();
        }
 
-       public static void resetAndClearAccumulators(
-                       Map<String, Accumulator<?, ?>> accumulators) {
-               for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulators.entrySet()) {
-                       entry.getValue().resetLocal();
+       public static void resetAndClearAccumulators(Map<String, Accumulator<?, 
?>> accumulators) {
+               if (accumulators != null) {
+                       for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulators.entrySet()) {
+                               entry.getValue().resetLocal();
+                       }
+                       accumulators.clear();
                }
-               accumulators.clear();
        }
 
        public static Map<String, Accumulator<?, ?>> copy(final Map<String, 
Accumulator<?,

http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c844d8e..1c3328e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
@@ -70,6 +71,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -508,14 +510,13 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                        // JobManager. close() has been called earlier for all 
involved UDFs
                        // (using this.stub.close() and closeChainedTasks()), 
so UDFs can no longer
                        // modify accumulators;
-                       if (this.stub != null) {
-                               // collect the counters from the stub
-                               if 
(FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != 
null) {
-                                       Map<String, Accumulator<?, ?>> 
accumulators =
-                                                       
FunctionUtils.getFunctionRuntimeContext(this.stub, 
this.runtimeUdfContext).getAllAccumulators();
-                                       
RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, 
this.chainedTasks);
-                               }
-                       }
+
+                       // collect the counters from the udf in the core driver
+                       Map<String, Accumulator<?, ?>> accumulators =
+                                       
FunctionUtils.getFunctionRuntimeContext(this.stub, 
this.runtimeUdfContext).getAllAccumulators();
+                       
+                       // collect accumulators from chained tasks and report 
them
+                       reportAndClearAccumulators(getEnvironment(), 
accumulators, this.chainedTasks);
                }
                catch (Exception ex) {
                        // close the input, but do not report any exceptions, 
since we already have another root cause
@@ -572,16 +573,25 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                // We can merge here the accumulators from the stub and the 
chained
                // tasks. Type conflicts can occur here if counters with same 
name but
                // different type were used.
-               for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-                       if 
(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
-                               Map<String, Accumulator<?, ?>> 
chainedAccumulators =
-                                               
FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), 
null).getAllAccumulators();
-                               AccumulatorHelper.mergeInto(accumulators, 
chainedAccumulators);
+               
+               if (!chainedTasks.isEmpty()) {
+                       if (accumulators == null) {
+                               accumulators = new HashMap<String, 
Accumulator<?, ?>>();
+                       }
+                       
+                       for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
+                               RuntimeContext rc = 
FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+                               if (rc != null) {
+                                       Map<String, Accumulator<?, ?>> 
chainedAccumulators = rc.getAllAccumulators();
+                                       if (chainedAccumulators != null) {
+                                               
AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+                                       }
+                               }
                        }
                }
 
                // Don't report if the UDF didn't collect any accumulators
-               if (accumulators.size() == 0) {
+               if (accumulators == null || accumulators.size() == 0) {
                        return;
                }
 
@@ -592,9 +602,11 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                // (e.g. in iterations) and we don't want to count twice. This 
may not be
                // done before sending
                AccumulatorHelper.resetAndClearAccumulators(accumulators);
+               
                for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-                       if 
(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
-                               
AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(),
 null).getAllAccumulators());
+                       RuntimeContext rc = 
FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+                       if (rc != null) {
+                               
AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators());
                        }
                }
        }
@@ -1140,7 +1152,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                        } catch (InterruptedException iex) {
                                throw new RuntimeException("Interrupted while 
waiting for input " + index + " to become available.");
                        } catch (IOException ioex) {
-                               throw new RuntimeException("An I/O Exception 
occurred whily obaining input " + index + ".");
+                               throw new RuntimeException("An I/O Exception 
occurred while obtaining input " + index + ".");
                        }
                }
        }

Reply via email to