http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 4381fd0..29efc4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.task.AbstractEvent; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; @@ -49,6 +50,12 @@ public class RecordWriter<T extends IOReadableWritable> { private final int numChannels; + /** + * Counter for the number of records emitted and for the number of bytes written. + * @param counter + */ + private AccumulatorRegistry.Reporter reporter; + /** {@link RecordSerializer} per outgoing channel */ private final RecordSerializer<T>[] serializers; @@ -81,6 +88,7 @@ public class RecordWriter<T extends IOReadableWritable> { synchronized (serializer) { SerializationResult result = serializer.addRecord(record); + while (result.isFullBuffer()) { Buffer buffer = serializer.getCurrentBuffer(); @@ -90,8 +98,18 @@ public class RecordWriter<T extends IOReadableWritable> { } buffer = writer.getBufferProvider().requestBufferBlocking(); + if (reporter != null) { + // increase the number of written bytes by the memory segment's size + reporter.reportNumBytesOut(buffer.getSize()); + } + result = serializer.setNextBuffer(buffer); } + + if(reporter != null) { + // count number of emitted records + reporter.reportNumRecordsOut(1); + } } } } @@ -173,4 +191,14 @@ public class RecordWriter<T extends IOReadableWritable> { } } } + + /** + * Counter for the number of records emitted and the records processed. + */ + public void setReporter(AccumulatorRegistry.Reporter reporter) { + for(RecordSerializer<?> serializer : serializers) { + serializer.setReporter(reporter); + } + } + }
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java index 9c5fdca..72434e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.iterative.task; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.aggregators.Aggregator; @@ -52,6 +53,8 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.MutableObjectIterator; import java.io.IOException; +import java.io.Serializable; +import java.util.Map; /** * The abstract base class for all tasks able to participate in an iteration. @@ -166,7 +169,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig()); + env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), this.accumulatorMap); } // -------------------------------------------------------------------------------------------- @@ -356,8 +359,10 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext { - public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); + public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, + ExecutionConfig executionConfig, + Map<String, Accumulator<?,?>> accumulatorMap) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulatorMap); } @Override @@ -375,6 +380,14 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends public <T extends Value> T getPreviousIterationAggregate(String name) { return (T) getIterationAggregators().getPreviousGlobalAggregate(name); } + + @Override + public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> newAccumulator) { + // only add accumulator on first iteration + if (inFirstIteration()) { + super.addAccumulator(name, newAccumulator); + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java index cf02bdf..9cb045f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.slf4j.Logger; @@ -112,8 +113,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>(); final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig(); final ClassLoader userCodeClassLoader = getUserCodeClassLoader(); + AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter(); this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig, - userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs()); + userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter); // sanity check the setup final int writersIntoStepFunction = this.eventualOutputs.size(); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 85dd5c5..df41672 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This is the abstract base class for every task that can be executed ba a TaskManager. + * This is the abstract base class for every task that can be executed by a TaskManager. * Concrete tasks like the stream vertices of the batch tasks * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit from this class. * http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java deleted file mode 100644 index c824232..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.accumulators; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; - -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.util.SerializedValue; - -/** - * This class manages the accumulators for different jobs. Either the jobs are - * running and new accumulator results have to be merged in, or the jobs are no - * longer running and the results shall be still available for the client or the - * web interface. Accumulators for older jobs are automatically removed when new - * arrive, based on a maximum number of entries. - * - * All functions are thread-safe and thus can be called directly from - * JobManager. - */ -public class AccumulatorManager { - - /** Map of accumulators belonging to recently started jobs */ - private final Map<JobID, JobAccumulators> jobAccumulators = new HashMap<JobID, JobAccumulators>(); - - private final LinkedList<JobID> lru = new LinkedList<JobID>(); - private int maxEntries; - - - public AccumulatorManager(int maxEntries) { - this.maxEntries = maxEntries; - } - - /** - * Merges the new accumulators with the existing accumulators collected for - * the job. - */ - public void processIncomingAccumulators(JobID jobID, - Map<String, Accumulator<?, ?>> newAccumulators) { - synchronized (this.jobAccumulators) { - - JobAccumulators jobAccumulators = this.jobAccumulators.get(jobID); - if (jobAccumulators == null) { - jobAccumulators = new JobAccumulators(); - this.jobAccumulators.put(jobID, jobAccumulators); - cleanup(jobID); - } - jobAccumulators.processNew(newAccumulators); - } - } - - public Map<String, Object> getJobAccumulatorResults(JobID jobID) { - Map<String, Object> result = new HashMap<String, Object>(); - - JobAccumulators acc; - synchronized (jobAccumulators) { - acc = jobAccumulators.get(jobID); - } - - if (acc != null) { - for (Map.Entry<String, Accumulator<?, ?>> entry : acc.getAccumulators().entrySet()) { - result.put(entry.getKey(), entry.getValue().getLocalValue()); - } - } - - return result; - } - - public Map<String, SerializedValue<Object>> getJobAccumulatorResultsSerialized(JobID jobID) throws IOException { - JobAccumulators acc; - synchronized (jobAccumulators) { - acc = jobAccumulators.get(jobID); - } - - if (acc == null || acc.getAccumulators().isEmpty()) { - return Collections.emptyMap(); - } - - Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>(); - for (Map.Entry<String, Accumulator<?, ?>> entry : acc.getAccumulators().entrySet()) { - result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue())); - } - - return result; - } - - public StringifiedAccumulatorResult[] getJobAccumulatorResultsStringified(JobID jobID) throws IOException { - JobAccumulators acc; - synchronized (jobAccumulators) { - acc = jobAccumulators.get(jobID); - } - - if (acc == null || acc.getAccumulators().isEmpty()) { - return new StringifiedAccumulatorResult[0]; - } - - Map<String, Accumulator<?, ?>> accMap = acc.getAccumulators(); - - StringifiedAccumulatorResult[] result = new StringifiedAccumulatorResult[accMap.size()]; - int i = 0; - for (Map.Entry<String, Accumulator<?, ?>> entry : accMap.entrySet()) { - String type = entry.getValue() == null ? "(null)" : entry.getValue().getClass().getSimpleName(); - String value = entry.getValue() == null ? "(null)" : entry.getValue().toString(); - result[i++] = new StringifiedAccumulatorResult(entry.getKey(), type, value); - } - return result; - } - - /** - * Cleanup data for the oldest jobs if the maximum number of entries is reached. - * - * @param jobId The (potentially new) JobId. - */ - private void cleanup(JobID jobId) { - if (!lru.contains(jobId)) { - lru.addFirst(jobId); - } - if (lru.size() > this.maxEntries) { - JobID toRemove = lru.removeLast(); - this.jobAccumulators.remove(toRemove); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java deleted file mode 100644 index 970d993..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.accumulators; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; - -/** - * Simple class wrapping a map of accumulators for a single job. Just for better - * handling. - */ -public class JobAccumulators { - - private final Map<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>(); - - public Map<String, Accumulator<?, ?>> getAccumulators() { - return this.accumulators; - } - - public void processNew(Map<String, Accumulator<?, ?>> newAccumulators) { - AccumulatorHelper.mergeInto(this.accumulators, newAccumulators); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 345e1ab..b3130a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.reader.MutableReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; @@ -356,6 +357,11 @@ public class DataSinkTask<IT> extends AbstractInvokable { throw new Exception("Illegal input group size in task configuration: " + groupSize); } + final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); + final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); + + inputReader.setReporter(reporter); + this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader()); @SuppressWarnings({ "rawtypes" }) final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer()); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 0bbe4bf..3f1c642 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -39,9 +40,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; /** @@ -187,25 +188,22 @@ public class DataSourceTask<OT> extends AbstractInvokable { format.close(); } } // end for all input splits - + // close the collector. if it is a chaining task collector, it will close its chained tasks this.output.close(); - + // close all chained tasks letting them report failure RegularPactTask.closeChainedTasks(this.chainedTasks, this); - - // Merge and report accumulators - RegularPactTask.reportAndClearAccumulators(getEnvironment(), - new HashMap<String, Accumulator<?,?>>(), chainedTasks); + } catch (Exception ex) { // close the input, but do not report any exceptions, since we already have another root cause try { this.format.close(); } catch (Throwable ignored) {} - + RegularPactTask.cancelChainedTasks(this.chainedTasks); - + ex = ExceptionInChainedStubException.exceptionUnwrap(ex); if (ex instanceof CancelTaskException) { @@ -275,7 +273,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { catch (Throwable t) { throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t); } - + // get the factory for the type serializer this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader); } @@ -287,7 +285,14 @@ public class DataSourceTask<OT> extends AbstractInvokable { private void initOutputs(ClassLoader cl) throws Exception { this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>(); this.eventualOutputs = new ArrayList<RecordWriter<?>>(); - this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, getExecutionConfig()); + + final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); + final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); + + Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap(); + + this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, + getExecutionConfig(), reporter, accumulatorMap); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java index d0f4116..a53f5bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java @@ -74,7 +74,7 @@ public interface PactDriver<S extends Function, OT> { * code typically signal situations where this instance in unable to proceed, exceptions * from the user code should be forwarded. */ - void run() throws Exception; + void run() throws Exception; /** * This method is invoked in any case (clean termination and exception) at the end of the tasks operation. http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java index b296506..bc23fa3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java @@ -50,7 +50,7 @@ public interface PactTaskContext<S, OT> { MemoryManager getMemoryManager(); IOManager getIOManager(); - + <X> MutableObjectIterator<X> getInput(int index); <X> TypeSerializerFactory<X> getInputSerializer(int index); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 1c3328e..78bf383 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -20,18 +20,17 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -71,7 +70,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -114,7 +112,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i protected List<RecordWriter<?>> eventualOutputs; /** - * The input readers to this task. + * The input readers of this task. */ protected MutableReader<?>[] inputReaders; @@ -212,7 +210,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i */ protected volatile boolean running = true; - + /** + * The accumulator map used in the RuntimeContext. + */ + protected Map<String, Accumulator<?,?>> accumulatorMap; + // -------------------------------------------------------------------------------------------- // Task Interface // -------------------------------------------------------------------------------------------- @@ -273,7 +275,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i LOG.debug(formatLogString("Start task code.")); } - this.runtimeUdfContext = createRuntimeContext(getEnvironment().getTaskName()); + Environment env = getEnvironment(); + + this.runtimeUdfContext = createRuntimeContext(env.getTaskName()); // whatever happens in this scope, make sure that the local strategies are cleaned up! // note that the initialization of the local strategies is in the try-finally block as well, @@ -367,6 +371,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i clearReaders(inputReaders); clearWriters(eventualOutputs); + } if (this.running) { @@ -505,18 +510,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // close all chained tasks letting them report failure RegularPactTask.closeChainedTasks(this.chainedTasks, this); - - // Collect the accumulators of all involved UDFs and send them to the - // JobManager. close() has been called earlier for all involved UDFs - // (using this.stub.close() and closeChainedTasks()), so UDFs can no longer - // modify accumulators; - - // collect the counters from the udf in the core driver - Map<String, Accumulator<?, ?>> accumulators = - FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); - - // collect accumulators from chained tasks and report them - reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); } catch (Exception ex) { // close the input, but do not report any exceptions, since we already have another root cause @@ -557,60 +550,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i } } - /** - * This method is called at the end of a task, receiving the accumulators of - * the task and the chained tasks. It merges them into a single map of - * accumulators and sends them to the JobManager. - * - * @param chainedTasks - * Each chained task might have accumulators which will be merged - * with the accumulators of the stub. - */ - protected static void reportAndClearAccumulators(Environment env, - Map<String, Accumulator<?, ?>> accumulators, - ArrayList<ChainedDriver<?, ?>> chainedTasks) { - - // We can merge here the accumulators from the stub and the chained - // tasks. Type conflicts can occur here if counters with same name but - // different type were used. - - if (!chainedTasks.isEmpty()) { - if (accumulators == null) { - accumulators = new HashMap<String, Accumulator<?, ?>>(); - } - - for (ChainedDriver<?, ?> chainedTask : chainedTasks) { - RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null); - if (rc != null) { - Map<String, Accumulator<?, ?>> chainedAccumulators = rc.getAllAccumulators(); - if (chainedAccumulators != null) { - AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); - } - } - } - } - - // Don't report if the UDF didn't collect any accumulators - if (accumulators == null || accumulators.size() == 0) { - return; - } - - // Report accumulators to JobManager - env.reportAccumulators(accumulators); - - // We also clear the accumulators, since stub instances might be reused - // (e.g. in iterations) and we don't want to count twice. This may not be - // done before sending - AccumulatorHelper.resetAndClearAccumulators(accumulators); - - for (ChainedDriver<?, ?> chainedTask : chainedTasks) { - RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null); - if (rc != null) { - AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators()); - } - } - } - protected void closeLocalStrategiesAndCaches() { // make sure that all broadcast variable references held by this task are released @@ -725,6 +664,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i int currentReaderOffset = 0; + AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); + AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); + for (int i = 0; i < numInputs; i++) { // ---------------- create the input readers --------------------- // in case where a logical input unions multiple physical inputs, create a union reader @@ -744,6 +686,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i throw new Exception("Illegal input group size in task configuration: " + groupSize); } + inputReaders[i].setReporter(reporter); + currentReaderOffset += groupSize; } this.inputReaders = inputReaders; @@ -1073,14 +1017,21 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, this.getExecutionConfig()); + AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); + AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); + + this.accumulatorMap = accumulatorRegistry.getUserMap(); + + this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, + this.getExecutionConfig(), reporter, this.accumulatorMap); } public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); + return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), - env.getDistributedCacheEntries()); + env.getDistributedCacheEntries(), this.accumulatorMap); } // -------------------------------------------------------------------------------------------- @@ -1257,7 +1208,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i * @return The OutputCollector that data produced in this task is submitted to. */ public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, - List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception + List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception { if (numOutputs == 0) { return null; @@ -1286,11 +1237,15 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i } final DataDistribution distribution = config.getOutputDataDistribution(i, cl); final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl); - + oe = new RecordOutputEmitter(strategy, comparator, partitioner, distribution); } - writers.add(new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe)); + // setup accumulator counters + final RecordWriter<Record> recordWriter = new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe); + recordWriter.setReporter(reporter); + + writers.add(recordWriter); } if (eventualOutputs != null) { eventualOutputs.addAll(writers); @@ -1318,12 +1273,18 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i else { final DataDistribution dataDist = config.getOutputDataDistribution(i, cl); final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl); - + final TypeComparator<T> comparator = compFactory.createComparator(); oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist); } - writers.add(new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe)); + final RecordWriter<SerializationDelegate<T>> recordWriter = + new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe); + + // setup live accumulator counters + recordWriter.setReporter(reporter); + + writers.add(recordWriter); } if (eventualOutputs != null) { eventualOutputs.addAll(writers); @@ -1338,7 +1299,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i */ @SuppressWarnings("unchecked") public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config, - List<ChainedDriver<?, ?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig) + List<ChainedDriver<?, ?>> chainedTasksTarget, + List<RecordWriter<?>> eventualOutputs, + ExecutionConfig executionConfig, + AccumulatorRegistry.Reporter reporter, + Map<String, Accumulator<?,?>> accumulatorMap) throws Exception { final int numOutputs = config.getNumOutputs(); @@ -1370,12 +1335,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i final TaskConfig chainedStubConf = config.getChainedStubConfig(i); final String taskName = config.getChainedTaskName(i); - if (i == numChained -1) { + if (i == numChained - 1) { // last in chain, instantiate the output collector for this task - previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs()); + previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter); } - ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig); + ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap); chainedTasksTarget.add(0, ct); previous = ct; @@ -1386,7 +1351,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // else // instantiate the output collector the default way from this configuration - return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs); + return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index b4cfa27..ea6cfe3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators.chaining; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.runtime.execution.Environment; @@ -28,6 +29,8 @@ import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; +import java.util.Map; + /** * The interface to be implemented by drivers that do not run in an own pact task context, but are chained to other * tasks. @@ -50,20 +53,23 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector, - AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) + AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, + Map<String, Accumulator<?,?>> accumulatorMap) { this.config = config; this.taskName = taskName; this.outputCollector = outputCollector; this.userCodeClassLoader = userCodeClassLoader; - + + Environment env = parent.getEnvironment(); + if (parent instanceof RegularPactTask) { this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName); } else { - Environment env = parent.getEnvironment(); this.udfContext = new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, parent.getExecutionConfig(), - env.getDistributedCacheEntries()); + env.getDistributedCacheEntries(), accumulatorMap + ); } this.executionConfig = executionConfig; http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java index f4cd354..4b480ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext; @@ -41,12 +42,14 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>(); - public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); + public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, + ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators); } - public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks); + public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, + ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks); } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 5ab0150..f166c36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -21,10 +21,9 @@ package org.apache.flink.runtime.taskmanager; import akka.actor.ActorRef; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.accumulators.AccumulatorEvent; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -34,12 +33,10 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.util.SerializedValue; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Future; @@ -76,7 +73,9 @@ public class RuntimeEnvironment implements Environment { private final InputGate[] inputGates; private final ActorRef jobManagerActor; - + + private final AccumulatorRegistry accumulatorRegistry; + // ------------------------------------------------------------------------ public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId, @@ -86,6 +85,7 @@ public class RuntimeEnvironment implements Environment { ClassLoader userCodeClassLoader, MemoryManager memManager, IOManager ioManager, BroadcastVariableManager bcVarManager, + AccumulatorRegistry accumulatorRegistry, InputSplitProvider splitProvider, Map<String, Future<Path>> distCacheEntries, ResultPartitionWriter[] writers, @@ -93,7 +93,7 @@ public class RuntimeEnvironment implements Environment { ActorRef jobManagerActor) { checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism); - + this.jobId = checkNotNull(jobId); this.jobVertexId = checkNotNull(jobVertexId); this.executionId = checkNotNull(executionId); @@ -107,6 +107,7 @@ public class RuntimeEnvironment implements Environment { this.memManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); this.bcVarManager = checkNotNull(bcVarManager); + this.accumulatorRegistry = checkNotNull(accumulatorRegistry); this.splitProvider = checkNotNull(splitProvider); this.distCacheEntries = checkNotNull(distCacheEntries); this.writers = checkNotNull(writers); @@ -183,6 +184,11 @@ public class RuntimeEnvironment implements Environment { } @Override + public AccumulatorRegistry getAccumulatorRegistry() { + return accumulatorRegistry; + } + + @Override public InputSplitProvider getInputSplitProvider() { return splitProvider; } @@ -213,20 +219,6 @@ public class RuntimeEnvironment implements Environment { } @Override - public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) { - AccumulatorEvent evt; - try { - evt = new AccumulatorEvent(getJobID(), accumulators); - } - catch (IOException e) { - throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e); - } - - ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt); - jobManagerActor.tell(accResult, ActorRef.noSender()); - } - - @Override public void acknowledgeCheckpoint(long checkpointId) { acknowledgeCheckpoint(checkpointId, null); } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 616998c..13a2ace 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -100,10 +101,10 @@ public class Task implements Runnable { /** The class logger. */ private static final Logger LOG = LoggerFactory.getLogger(Task.class); - + /** The tread group that contains all task threads */ private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads"); - + /** For atomic state updates */ private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState"); @@ -176,13 +177,16 @@ public class Task implements Runnable { /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; - + /** The cache for user-defined files that the invokable requires */ private final FileCache fileCache; - + /** The gateway to the network stack, which handles inputs and produced results */ private final NetworkEnvironment network; + /** The registry of this task which enables live reporting of accumulators */ + private final AccumulatorRegistry accumulatorRegistry; + /** The thread that executes the task */ private final Thread executingThread; @@ -194,10 +198,10 @@ public class Task implements Runnable { /** atomic flag that makes sure the invokable is canceled exactly once upon error */ private final AtomicBoolean invokableHasBeenCanceled; - + /** The invokable of this task, if initialized */ private volatile AbstractInvokable invokable; - + /** The current execution state of the task */ private volatile ExecutionState executionState = ExecutionState.CREATED; @@ -245,12 +249,13 @@ public class Task implements Runnable { this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); - this.broadcastVariableManager =checkNotNull(bcVarManager); + this.broadcastVariableManager = checkNotNull(bcVarManager); + this.accumulatorRegistry = new AccumulatorRegistry(jobId, executionId); this.jobManager = checkNotNull(jobManagerActor); this.taskManager = checkNotNull(taskManagerActor); this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout)); - + this.libraryCache = checkNotNull(libraryCache); this.fileCache = checkNotNull(fileCache); this.network = checkNotNull(networkEnvironment); @@ -361,6 +366,10 @@ public class Task implements Runnable { return inputGatesById.get(id); } + public AccumulatorRegistry getAccumulatorRegistry() { + return accumulatorRegistry; + } + public Thread getExecutingThread() { return executingThread; } @@ -499,7 +508,8 @@ public class Task implements Runnable { Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, taskName, taskNameWithSubtask, subtaskIndex, parallelism, jobConfiguration, taskConfiguration, - userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, + userCodeClassLoader, memoryManager, ioManager, + broadcastVariableManager, accumulatorRegistry, splitProvider, distributedCacheEntries, writers, inputGates, jobManager); @@ -518,7 +528,7 @@ public class Task implements Runnable { // get our private reference onto the stack (be safe against concurrent changes) SerializedValue<StateHandle<?>> operatorState = this.operatorState; - + if (operatorState != null) { if (invokable instanceof OperatorStateCarrier) { try { @@ -553,7 +563,7 @@ public class Task implements Runnable { if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } - + // notify everyone that we switched to running. especially the TaskManager needs // to know this! notifyObservers(ExecutionState.RUNNING, null); @@ -653,7 +663,7 @@ public class Task implements Runnable { finally { try { LOG.info("Freeing task resources for " + taskNameWithSubtask); - + // stop the async dispatcher. // copy dispatcher reference to stack, against concurrent release ExecutorService dispatcher = this.asyncCallDispatcher; @@ -867,15 +877,15 @@ public class Task implements Runnable { */ public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) { AbstractInvokable invokable = this.invokable; - + if (executionState == ExecutionState.RUNNING && invokable != null) { if (invokable instanceof CheckpointedOperator) { - + // build a local closure final CheckpointedOperator checkpointer = (CheckpointedOperator) invokable; final Logger logger = LOG; final String taskName = taskNameWithSubtask; - + Runnable runnable = new Runnable() { @Override public void run() { @@ -1038,7 +1048,7 @@ public class Task implements Runnable { public String toString() { return getTaskNameWithSubtasks() + " [" + executionState + ']'; } - + // ------------------------------------------------------------------------ // Task Names // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java index 6c85ab5..0637017 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import java.util.Arrays; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; @@ -52,8 +53,11 @@ public class TaskExecutionState implements java.io.Serializable { // class may not be part of the system class loader. private transient Throwable cachedError; + /** Serialized flink and user-defined accumulators */ + private final AccumulatorSnapshot accumulators; + /** - * Creates a new task execution state update, with no attached exception. + * Creates a new task execution state update, with no attached exception and no accumulators. * * @param jobID * the ID of the job the task belongs to @@ -63,13 +67,28 @@ public class TaskExecutionState implements java.io.Serializable { * the execution state to be reported */ public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) { - this(jobID, executionId, executionState, null); + this(jobID, executionId, executionState, null, null); } - + + /** + * Creates a new task execution state update, with an attached exception but no accumulators. + * + * @param jobID + * the ID of the job the task belongs to + * @param executionId + * the ID of the task execution whose state is to be reported + * @param executionState + * the execution state to be reported + */ + public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, + ExecutionState executionState, Throwable error) { + this(jobID, executionId, executionState, error, null); + } + /** * Creates a new task execution state update, with an attached exception. * This constructor may never throw an exception. - * + * * @param jobID * the ID of the job the task belongs to * @param executionId @@ -78,11 +97,15 @@ public class TaskExecutionState implements java.io.Serializable { * the execution state to be reported * @param error * an optional error + * @param accumulators + * The flink and user-defined accumulators which may be null. */ public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, - ExecutionState executionState, Throwable error) { + ExecutionState executionState, Throwable error, + AccumulatorSnapshot accumulators) { - if (jobID == null || executionId == null || executionState == null) { + + if (jobID == null || executionId == null || executionState == null) { throw new NullPointerException(); } @@ -90,6 +113,7 @@ public class TaskExecutionState implements java.io.Serializable { this.executionId = executionId; this.executionState = executionState; this.cachedError = error; + this.accumulators = accumulators; if (error != null) { byte[] serializedError; @@ -178,6 +202,13 @@ public class TaskExecutionState implements java.io.Serializable { return this.jobID; } + /** + * Gets flink and user-defined accumulators in serialized form. + */ + public AccumulatorSnapshot getAccumulators() { + return accumulators; + } + // -------------------------------------------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java index f5e897b..6a5468a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java @@ -29,7 +29,7 @@ import java.util.Arrays; * special class loader, the deserialization fails with a {@code ClassNotFoundException}. * * To work around that issue, the SerializedValue serialized data immediately into a byte array. - * When send through RPC or another service that uses serialization, the only the byte array is + * When send through RPC or another service that uses serialization, only the byte array is * transferred. The object is deserialized later (upon access) and requires the accessor to * provide the corresponding class loader. * http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3b4ce15..8823041 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -29,39 +29,39 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult -import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.client._ -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} -import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} -import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager -import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} -import org.apache.flink.runtime.messages.RegistrationMessages._ -import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState} import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint} -import org.apache.flink.runtime.net.NetUtils import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager -import org.apache.flink.runtime.util.{EnvironmentInformation, SerializedValue, ZooKeeperUtil} -import org.apache.flink.runtime.{ActorLogMessages, ActorSynchronousLogging, StreamingMode} +import org.apache.flink.runtime.util.ZooKeeperUtil +import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation} +import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus} +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} +import org.apache.flink.runtime.messages.JobManagerMessages._ +import org.apache.flink.runtime.messages.RegistrationMessages._ +import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat} import org.apache.flink.util.{ExceptionUtils, InstantiationUtil} -import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ForkJoinPool import scala.language.postfixOps +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ /** * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the @@ -97,7 +97,6 @@ class JobManager( protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, protected val archive: ActorRef, - protected val accumulatorManager: AccumulatorManager, protected val defaultExecutionRetries: Int, protected val delayBetweenRetries: Long, protected val timeout: FiniteDuration, @@ -221,7 +220,6 @@ class JobManager( originalSender ! result }(context.dispatcher) - sender ! true case None => log.error("Cannot find execution graph for ID " + s"${taskExecutionState.getJobID} to change state to " + s"${taskExecutionState.getExecutionState}.") @@ -298,7 +296,7 @@ class JobManager( newJobStatus match { case JobStatus.FINISHED => val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try { - accumulatorManager.getJobAccumulatorResultsSerialized(jobID) + executionGraph.getAccumulatorsSerialized } catch { case e: Exception => log.error(s"Cannot fetch serialized accumulators for job $jobID", e) @@ -402,13 +400,22 @@ class JobManager( import scala.collection.JavaConverters._ sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) - case Heartbeat(instanceID, metricsReport) => - try { - log.debug(s"Received hearbeat message from $instanceID.") - instanceManager.reportHeartBeat(instanceID, metricsReport) - } catch { - case t: Throwable => log.error(s"Could not report heart beat from ${sender().path}.", t) - } + case Heartbeat(instanceID, metricsReport, accumulators) => + log.debug(s"Received hearbeat message from $instanceID.") + + Future { + accumulators foreach { + case accumulators => + currentJobs.get(accumulators.getJobID) match { + case Some((jobGraph, jobInfo)) => + jobGraph.updateAccumulators(accumulators) + case None => + // ignore accumulator values for old job + } + } + }(context.dispatcher) + + instanceManager.reportHeartBeat(instanceID, metricsReport) case message: AccumulatorMessage => handleAccumulatorMessage(message) @@ -676,33 +683,18 @@ class JobManager( * @param message The accumulator message. */ private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = { - message match { - case ReportAccumulatorResult(jobId, _, accumulatorEvent) => - val classLoader = try { - libraryCacheManager.getClassLoader(jobId) - } catch { - case e: Exception => - log.error("Dropping accumulators. No class loader available for job " + jobId, e) - return - } - - if (classLoader != null) { - try { - val accumulators = accumulatorEvent.deserializeValue(classLoader) - accumulatorManager.processIncomingAccumulators(jobId, accumulators) - } - catch { - case e: Exception => log.error("Cannot update accumulators for job " + jobId, e) - } - } else { - log.error("Dropping accumulators. No class loader available for job " + jobId) - } case RequestAccumulatorResults(jobID) => try { - val accumulatorValues: java.util.Map[String, SerializedValue[Object]] = - accumulatorManager.getJobAccumulatorResultsSerialized(jobID) + val accumulatorValues: java.util.Map[String, SerializedValue[Object]] = { + currentJobs.get(jobID) match { + case Some((graph, jobInfo)) => + graph.getAccumulatorsSerialized + case None => + null // TODO check also archive + } + } sender() ! AccumulatorResultsFound(jobID, accumulatorValues) } @@ -714,8 +706,31 @@ class JobManager( case RequestAccumulatorResultsStringified(jobId) => try { - val accumulatorValues: Array[StringifiedAccumulatorResult] = - accumulatorManager.getJobAccumulatorResultsStringified(jobId) + val accumulatorValues: Array[StringifiedAccumulatorResult] = { + currentJobs.get(jobId) match { + case Some((graph, jobInfo)) => + val accumulators = graph.aggregateUserAccumulators() + + val result: Array[StringifiedAccumulatorResult] = new + Array[StringifiedAccumulatorResult](accumulators.size) + + var i = 0 + accumulators foreach { + case (name, accumulator) => + val (typeString, valueString) = + if (accumulator != null) { + (accumulator.getClass.getSimpleName, accumulator.toString) + } else { + (null, null) + } + result(i) = new StringifiedAccumulatorResult(name, typeString, valueString) + i += 1 + } + result + case None => + null // TODO check also archive + } + } sender() ! AccumulatorResultStringsFound(jobId, accumulatorValues) } @@ -1058,7 +1073,7 @@ object JobManager { */ def createJobManagerComponents(configuration: Configuration) : (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager, - Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = { + Props, Int, Long, FiniteDuration, Int) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -1091,8 +1106,6 @@ object JobManager { val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount) - val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount)) - val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool()) var blobServer: BlobServer = null @@ -1131,7 +1144,6 @@ object JobManager { scheduler, libraryCacheManager, archiveProps, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, @@ -1179,7 +1191,6 @@ object JobManager { scheduler, libraryCacheManager, archiveProps, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, @@ -1199,7 +1210,6 @@ object JobManager { scheduler, libraryCacheManager, archiver, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index b12f1b5..6cb571c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -18,6 +18,7 @@ package org.apache.flink.runtime.messages +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.instance.InstanceID /** @@ -52,8 +53,10 @@ object TaskManagerMessages { * * @param instanceID The instance ID of the reporting TaskManager. * @param metricsReport utf-8 encoded JSON metrics report from the metricRegistry. + * @param accumulators Accumulators of tasks serialized as Tuple2[internal, user-defined] */ - case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte]) + case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte], + accumulators: Seq[AccumulatorSnapshot]) // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala index 82c4ab6..015c96e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala @@ -19,8 +19,7 @@ package org.apache.flink.runtime.messages.accumulators import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.accumulators.{StringifiedAccumulatorResult, AccumulatorEvent} -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult import org.apache.flink.runtime.util.SerializedValue /** @@ -38,18 +37,6 @@ sealed trait AccumulatorMessage { sealed trait AccumulatorResultsResponse extends AccumulatorMessage /** - * Reports the accumulator results of the individual tasks to the job manager. - * - * @param jobID The ID of the job the accumulator belongs to - * @param executionId The ID of the task execution that the accumulator belongs to. - * @param accumulatorEvent The serialized accumulators - */ -case class ReportAccumulatorResult(jobID: JobID, - executionId: ExecutionAttemptID, - accumulatorEvent: AccumulatorEvent) - extends AccumulatorMessage - -/** * Requests the accumulator results of the job identified by [[jobID]] from the job manager. * The result is sent back to the sender as a [[AccumulatorResultsResponse]] message. * http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 1a35d01..f07fa0c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -37,6 +37,7 @@ import grizzled.slf4j.Logger import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException} import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage} +import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, AccumulatorRegistry} import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.{BlobService, BlobCache} @@ -68,6 +69,7 @@ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success} import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.language.postfixOps @@ -328,7 +330,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { network.getPartitionManager.releasePartitionsProducedBy(executionID) } catch { case t: Throwable => killTaskManagerFatal( - "Fatal leak: Unable to release intermediate result partition data", t) + "Fatal leak: Unable to release intermediate result partition data", t) } } @@ -389,7 +391,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { } else { log.debug(s"Cannot find task to cancel for execution ${executionID})") sender ! new TaskOperationResult(executionID, false, - "No task with that execution ID was found.") + "No task with that execution ID was found.") } case PartitionState(taskExecutionId, taskResultId, partitionId, state) => @@ -400,7 +402,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { log.debug(s"Cannot find task $taskExecutionId to respond with partition state.") } } - } + } } /** @@ -793,12 +795,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // create the task. this does not grab any TaskManager resources or download // and libraries - the operation does not block - val execId = tdd.getExecutionId val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager, self, jobManagerActor, config.timeout, libCache, fileCache) log.info(s"Received task ${task.getTaskNameWithSubtasks}") - + + val execId = tdd.getExecutionId // add the task to the map val prevTask = runningTasks.put(execId, task) if (prevTask != null) { @@ -898,22 +900,28 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { val task = runningTasks.remove(executionID) if (task != null) { - - // the task must be in a terminal state - if (!task.getExecutionState.isTerminal) { - try { - task.failExternally(new Exception("Task is being removed from TaskManager")) - } catch { - case e: Exception => log.error("Could not properly fail task", e) - } + + // the task must be in a terminal state + if (!task.getExecutionState.isTerminal) { + try { + task.failExternally(new Exception("Task is being removed from TaskManager")) + } catch { + case e: Exception => log.error("Could not properly fail task", e) } + } + + log.info(s"Unregistering task and sending final execution state " + + s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " + + s"(${task.getExecutionId})") - log.info(s"Unregistering task and sending final execution state " + - s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " + - s"(${task.getExecutionId})") + val accumulators = { + val registry = task.getAccumulatorRegistry + registry.getSnapshot + } - self ! UpdateTaskExecutionState(new TaskExecutionState( - task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause)) + self ! UpdateTaskExecutionState(new TaskExecutionState( + task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause, + accumulators)) } else { log.error(s"Cannot find task with ID $executionID to unregister.") @@ -931,9 +939,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { private def sendHeartbeatToJobManager(): Unit = { try { log.debug("Sending heartbeat to JobManager") - val report: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry) - currentJobManager foreach { - jm => jm ! Heartbeat(instanceID, report) + val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry) + + val accumulatorEvents = + scala.collection.mutable.Buffer[AccumulatorSnapshot]() + + runningTasks foreach { + case (execID, task) => + val registry = task.getAccumulatorRegistry + val accumulators = registry.getSnapshot + accumulatorEvents.append(accumulators) + } + + currentJobManager foreach { + jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents) } } catch { http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java index 4e5fb40..14bf022 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.io.network.api.reader; +import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; @@ -183,5 +185,10 @@ public class AbstractReaderTest { protected MockReader(InputGate inputGate) { super(inputGate); } + + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 9b9609b..0aab5fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -61,7 +61,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re private final IOManager ioManager; private final MemoryManager memManager; - + private final List<MutableObjectIterator<Record>> inputs; private final List<TypeComparator<Record>> comparators; @@ -105,7 +105,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re this.perSortFractionMem = (double)perSortMemory/totalMem; this.ioManager = new IOManagerAsync(); this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null; - + this.inputs = new ArrayList<MutableObjectIterator<Record>>(); this.comparators = new ArrayList<TypeComparator<Record>>(); this.sorters = new ArrayList<UnilateralSortMerger<Record>>(); @@ -295,7 +295,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re public IOManager getIOManager() { return this.ioManager; } - + @Override public MemoryManager getMemoryManager() { return this.memManager; http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 0f62b27..b9cb416 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -79,6 +80,8 @@ public class MockEnvironment implements Environment { private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager(); + private final AccumulatorRegistry accumulatorRegistry; + private final int bufferSize; public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { @@ -91,6 +94,8 @@ public class MockEnvironment implements Environment { this.ioManager = new IOManagerAsync(); this.inputSplitProvider = inputSplitProvider; this.bufferSize = bufferSize; + + this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId()); } public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) { @@ -259,8 +264,8 @@ public class MockEnvironment implements Environment { } @Override - public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) { - // discard, this is only for testing + public AccumulatorRegistry getAccumulatorRegistry() { + return this.accumulatorRegistry; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index e9e761c..bd36dd4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -25,6 +25,7 @@ import akka.actor.Props; import com.google.common.collect.Maps; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 219e5ae..f2535fa 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -65,7 +65,6 @@ class TestingCluster(userConfiguration: Configuration, scheduler, libraryCacheManager, _, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, @@ -82,7 +81,6 @@ class TestingCluster(userConfiguration: Configuration, scheduler, libraryCacheManager, archive, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 5747b7e..6d316ca 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -147,6 +147,16 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { case None => sender ! WorkingTaskManager(None) } + case RequestAccumulatorValues(jobID) => + + val (flinkAccumulators, userAccumulators) = currentJobs.get(jobID) match { + case Some((graph, jobInfo)) => + (graph.getFlinkAccumulators, graph.aggregateUserAccumulators) + case None => null + } + + sender ! RequestAccumulatorValuesResponse(jobID, flinkAccumulators, userAccumulators) + case NotifyWhenJobStatus(jobID, state) => val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID, scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]()) http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index 241c6c0..46e8486 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -20,9 +20,12 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.executiongraph.ExecutionGraph +import org.apache.flink.runtime.accumulators.AccumulatorRegistry +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} import org.apache.flink.runtime.instance.InstanceGateway import org.apache.flink.runtime.jobgraph.JobStatus +import java.util.Map +import org.apache.flink.api.common.accumulators.Accumulator object TestingJobManagerMessages { @@ -53,4 +56,9 @@ object TestingJobManagerMessages { case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef) case class TaskManagerTerminated(taskManager: ActorRef) + + case class RequestAccumulatorValues(jobID: JobID) + case class RequestAccumulatorValuesResponse(jobID: JobID, + flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]], + userAccumulators: Map[String, Accumulator[_,_]]) }
