Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/934#discussion_r35991305
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -560,53 +621,177 @@ public ExecutionContext getExecutionContext() {
}
/**
+ * This works as cache for already merged accumulators, as, in some
cases,
+ * we do not want to remerge accumulators as this may lead to duplicate
entries.
+ */
+ private Map<String, Accumulator<?, ?>> mergedSmallUserAccumulators;
+
+ /**
* Merges all accumulator results from the tasks previously executed in
the Executions.
+ *
* @return The accumulator map
*/
- public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
+ public Map<String, Accumulator<?, ?>> aggregateSmallUserAccumulators() {
+ return aggregateSmallUserAccumulators(true);
+ }
- Map<String, Accumulator<?, ?>> userAccumulators = new
HashMap<String, Accumulator<?, ?>>();
+ /**
+ * Merges all accumulator results from the tasks previously executed in
the Executions.
+ * If <code>reaggregate</code> is set to false, then no aggregation is
performed, and
+ * the cache merge result is returned. Otherwise accumulators are
merged.
+ *
+ * @param reaggregate <code>true</code> if we want to aggregate
accumulators,
+ * <code>false</code> otherwise.
+ * @return The accumulator map
+ */
+ public Map<String, Accumulator<?, ?>>
aggregateSmallUserAccumulators(boolean reaggregate) {
+ if (!reaggregate) {
+ return mergedSmallUserAccumulators;
+ }
+ this.mergedSmallUserAccumulators = new HashMap<String,
Accumulator<?, ?>>();
for (ExecutionVertex vertex : getAllExecutionVertices()) {
- Map<String, Accumulator<?, ?>> next =
vertex.getCurrentExecutionAttempt().getUserAccumulators();
+ Map<String, Accumulator<?, ?>> next =
vertex.getCurrentExecutionAttempt().getSmallUserAccumulators();
if (next != null) {
- AccumulatorHelper.mergeInto(userAccumulators,
next);
+
AccumulatorHelper.mergeInto(mergedSmallUserAccumulators, next);
}
}
+ return mergedSmallUserAccumulators;
+ }
- return userAccumulators;
+ /**
+ * Merges all blobKeys referring to blobs of large accumulators. These
refer to blobs in the
+ * blobCache holding accumulators (results of tasks) that did not fit
in an akka frame,
+ * thus had to be sent through the BlobCache.
+ *
+ * @return The accumulator map
+ */
+ public Map<String, List<BlobKey>>
aggregateLargeUserAccumulatorBlobKeys() {
+ Map<String, List<BlobKey>> largeUserAccumulatorRefs = new
HashMap<String, List<BlobKey>>();
+
+ for (ExecutionVertex vertex : getAllExecutionVertices()) {
+ Map<String, List<BlobKey>> next =
vertex.getCurrentExecutionAttempt().getLargeUserAccumulatorBlobKeys();
+
mergeLargeUserAccumulatorBlobKeys(largeUserAccumulatorRefs, next);
+ }
+ return largeUserAccumulatorRefs;
}
/**
- * Gets a serialized accumulator map.
+ * Adds new blobKeys referring to blobs of large accumulators to the
already existing ones.
+ * These refer to blobs in the blobCache holding accumulators (results
of tasks) that did not
+ * fit in an akka frame, thus had to be sent through the BlobCache.
+ *
+ * @param target the initial blobKey map
+ * @param toMerge the new keys to add to the initial map
+ * @return The resulting accumulator map
+ */
+ public Map<String, List<BlobKey>> addLargeUserAccumulatorBlobKeys(
+ Map<String, List<BlobKey>> target, Map<String,
List<BlobKey>> toMerge) {
+ if (target == null) {
+ target = new HashMap<String, List<BlobKey>>();
+ }
+ mergeLargeUserAccumulatorBlobKeys(target, toMerge);
+ return target;
+ }
+
+ private void mergeLargeUserAccumulatorBlobKeys(
+ Map<String, List<BlobKey>> target, Map<String,
List<BlobKey>> toMerge) {
+ if (toMerge == null || toMerge.isEmpty()) {
+ return;
+ }
+
+ for (Map.Entry<String, List<BlobKey>> otherEntry :
toMerge.entrySet()) {
+ List<BlobKey> existing =
target.get(otherEntry.getKey());
+ if (existing == null) {
+ target.put(otherEntry.getKey(),
otherEntry.getValue());
+ } else {
+ existing.addAll(otherEntry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Merges the partial accumulators referring to the same global
accumulator received from the tasks,
+ * and serializes the final result. Each of the partial accumulators
contains the partial result
+ * produced by each task, for that specific accumulator.
+ *
* @return The accumulator map with serialized accumulator values.
* @throws IOException
*/
- public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
throws IOException {
-
- Map<String, Accumulator<?, ?>> accumulatorMap =
aggregateUserAccumulators();
+ public Map<String, SerializedValue<Object>>
getSmallAccumulatorsContentSerialized() throws IOException {
+ Map<String, Accumulator<?, ?>> accumulatorMap =
aggregateSmallUserAccumulators(true);
Map<String, SerializedValue<Object>> result = new
HashMap<String, SerializedValue<Object>>();
for (Map.Entry<String, Accumulator<?, ?>> entry :
accumulatorMap.entrySet()) {
result.put(entry.getKey(), new
SerializedValue<Object>(entry.getValue().getLocalValue()));
}
+ return result;
+ }
+
+ /**
+ * Serializes the objects of the accumulators (not only the content as
the
+ * {@link #getSmallAccumulatorsContentSerialized()}. This means that
the actual
+ * objects are serialized, thus merging can still be applied after
deserialization.
+ * This method assumes that partial accumulators received from the
tasks that refer to
+ * the same global accumulator have already been merged.
+ *
+ * @return The accumulator map with serialized accumulator objects.
+ * @throws IOException
+ */
+ public Map<String, SerializedValue<Object>>
getSmallAccumulatorsSerialized() throws IOException {
+ Map<String, Accumulator<?, ?>> accumulatorMap =
aggregateSmallUserAccumulators(false);
+ Map<String, SerializedValue<Object>> result = new
HashMap<String, SerializedValue<Object>>();
+ for (Map.Entry<String, Accumulator<?, ?>> entry :
accumulatorMap.entrySet()) {
+ result.put(entry.getKey(), new
SerializedValue<Object>(entry.getValue()));
+ }
return result;
}
+// /**
+// * Merges and serializes all accumulator results from the tasks
previously executed in
+// * the Executions. If <code>onlyContent</code> is set to true, then the
Accumulators are
+// * merged and the content of the resulting Accumulator is serialized
and returned. In other
+// * case, the result is assumed to be merged, so no additional merging
is performed (as this
+// * could lead to duplicate entries), and the whole accumulator object
is serialized and
+// * returned.
+// * @param onlyContent <code>true</code> if we want to aggregate
accumulators and serialize just
+// * the content of the result, <code>false</code> if
(partial) accumulators
+// * are already merged (so no additional merging is
required), and we want the
+// * whole object serialized.
+// * @return The accumulator map
+// */
+// private Map<String, SerializedValue<Object>>
serializeAccumulators(boolean onlyContent) throws IOException {
+//
+// Map<String, Accumulator<?, ?>> accumulatorMap =
aggregateSmallUserAccumulators(onlyContent);
+//
+// Map<String, SerializedValue<Object>> result = new
HashMap<String, SerializedValue<Object>>();
+// for (Map.Entry<String, Accumulator<?, ?>> entry :
accumulatorMap.entrySet()) {
+// Object toSerialize = onlyContent ?
entry.getValue().getLocalValue() : entry.getValue();
+// result.put(entry.getKey(), new
SerializedValue<Object>(toSerialize));
+// }
+//
+// return result;
+// }
+
--- End diff --
Commented out.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---