Repository: flink Updated Branches: refs/heads/master 72b5dc980 -> aa6a7f04c
[FLINK-2388] forward message to archivist in case accumulators can't be found - return AccumulatorsNotFound in case the archive cannot find them either This closes #930. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa6a7f04 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa6a7f04 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa6a7f04 Branch: refs/heads/master Commit: aa6a7f04c54a1c3675b1d78a3d0cc885a1362fa9 Parents: 72b5dc9 Author: Maximilian Michels <m...@apache.org> Authored: Wed Jul 22 10:38:43 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Jul 22 16:32:48 2015 +0200 ---------------------------------------------------------------------- .../common/accumulators/AccumulatorHelper.java | 9 --- .../runtime/executiongraph/ExecutionGraph.java | 29 ++++++++ .../flink/runtime/jobmanager/JobManager.scala | 70 +++++--------------- .../runtime/jobmanager/MemoryArchivist.scala | 26 ++++++++ 4 files changed, 73 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java index 3e2e359..3907004 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java @@ -105,15 +105,6 @@ public class AccumulatorHelper { return resultMap; } - public static String getAccumulatorsFormated(Map<?, Accumulator<?, ?>> newAccumulators) { - StringBuilder builder = new StringBuilder(); - for (Map.Entry<?, Accumulator<?, ?>> entry : newAccumulators.entrySet()) { - builder.append("- " + entry.getKey() + " (" + entry.getValue().getClass().getName() - + ")" + ": " + entry.getValue().toString() + "\n"); - } - return builder.toString(); - } - public static String getResultsFormated(Map<String, Object> map) { StringBuilder builder = new StringBuilder(); for (Map.Entry<String, Object> entry : map.entrySet()) { http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 677c809..9c977fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.execution.ExecutionState; @@ -580,6 +581,34 @@ public class ExecutionGraph implements Serializable { return result; } + /** + * Returns the a stringified version of the user-defined accumulators. + * @return an Array containing the StringifiedAccumulatorResult objects + */ + public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() { + + Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); + + int num = accumulatorMap.size(); + StringifiedAccumulatorResult[] resultStrings = new StringifiedAccumulatorResult[num]; + + int i = 0; + for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) { + + StringifiedAccumulatorResult result; + Accumulator<?, ?> value = entry.getValue(); + if (value != null) { + result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString()); + } else { + result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null"); + } + + resultStrings[i++] = result; + } + + return resultStrings; + } + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d8d51ce..c195a78 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -32,7 +32,7 @@ import grizzled.slf4j.Logger import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.io.InputSplitAssigner -import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, StringifiedAccumulatorResult} +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.client._ import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} @@ -684,65 +684,31 @@ class JobManager( * @param message The accumulator message. */ private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = { - message match { - - case RequestAccumulatorResults(jobID) => - try { - val accumulatorValues: java.util.Map[String, SerializedValue[Object]] = { + message match { + case RequestAccumulatorResults(jobID) => + try { currentJobs.get(jobID) match { case Some((graph, jobInfo)) => - graph.getAccumulatorsSerialized + val accumulatorValues = graph.getAccumulatorsSerialized() + sender() ! AccumulatorResultsFound(jobID, accumulatorValues) case None => - null // TODO check also archive + archive.forward(message) } - } - - sender() ! AccumulatorResultsFound(jobID, accumulatorValues) - } - catch { + } catch { case e: Exception => - log.error("Cannot serialize accumulator result", e) + log.error("Cannot serialize accumulator result.", e) sender() ! AccumulatorResultsErroneous(jobID, e) - } - - case RequestAccumulatorResultsStringified(jobId) => - try { - val accumulatorValues: Array[StringifiedAccumulatorResult] = { - currentJobs.get(jobId) match { - case Some((graph, jobInfo)) => - val accumulators = graph.aggregateUserAccumulators() - - val result: Array[StringifiedAccumulatorResult] = new - Array[StringifiedAccumulatorResult](accumulators.size) - - var i = 0 - accumulators foreach { - case (name, accumulator) => - val (typeString, valueString) = - if (accumulator != null) { - (accumulator.getClass.getSimpleName, accumulator.toString) - } else { - (null, null) - } - result(i) = new StringifiedAccumulatorResult(name, typeString, valueString) - i += 1 - } - result - case None => - null // TODO check also archive - } } - sender() ! AccumulatorResultStringsFound(jobId, accumulatorValues) - } - catch { - case e: Exception => - log.error("Cannot fetch accumulator result", e) - sender() ! AccumulatorResultsErroneous(jobId, e) - } - - case x => unhandled(x) - } + case RequestAccumulatorResultsStringified(jobId) => + currentJobs.get(jobId) match { + case Some((graph, jobInfo)) => + val stringifiedAccumulators = graph.getAccumulatorResultsStringified() + sender() ! AccumulatorResultStringsFound(jobId, stringifiedAccumulators) + case None => + archive.forward(message) + } + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 6d0b220..7572e72 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -24,6 +24,7 @@ import akka.actor.Actor import org.apache.flink.api.common.JobID import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.webmonitor._ import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.executiongraph.ExecutionGraph @@ -120,6 +121,31 @@ class MemoryArchivist(private val max_entries: Int) catch { case t: Throwable => log.error("Exception while creating the jobs overview", t) } + + + case RequestAccumulatorResults(jobID) => + try { + graphs.get(jobID) match { + case Some(graph) => + val accumulatorValues = graph.getAccumulatorsSerialized() + sender() ! AccumulatorResultsFound(jobID, accumulatorValues) + case None => + sender() ! AccumulatorResultsNotFound(jobID) + } + } catch { + case e: Exception => + log.error("Cannot serialize accumulator result.", e) + sender() ! AccumulatorResultsErroneous(jobID, e) + } + + case RequestAccumulatorResultsStringified(jobID) => + graphs.get(jobID) match { + case Some(graph) => + val accumulatorValues = graph.getAccumulatorResultsStringified() + sender() ! AccumulatorResultStringsFound(jobID, accumulatorValues) + case None => + sender() ! AccumulatorResultsNotFound(jobID) + } } /**