Repository: flink
Updated Branches:
  refs/heads/release-1.5 071dedcb7 -> 60e05c05f


[FLINK-8881][runtime] Send accumulator updates via heartbeats


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6657aa20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6657aa20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6657aa20

Branch: refs/heads/release-1.5
Commit: 6657aa2090b6b0f56f329152b3aaa1c147e73380
Parents: 071dedc
Author: zentol <[email protected]>
Authored: Wed Mar 14 18:52:16 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 20 19:03:21 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 16 +++++---
 .../runtime/jobmaster/JobMasterGateway.java     |  6 ++-
 .../runtime/taskexecutor/AccumulatorReport.java | 40 ++++++++++++++++++++
 .../runtime/taskexecutor/TaskExecutor.java      | 32 ++++++++++++----
 .../utils/TestingJobMasterGateway.java          |  3 +-
 5 files changed, 81 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f0b29bf..6878032 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
@@ -93,6 +94,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -166,7 +168,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        private final JobManagerJobMetricGroup jobMetricGroup;
 
        /** The heartbeat manager with task managers. */
-       private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
+       private final HeartbeatManager<AccumulatorReport, Void> 
taskManagerHeartbeatManager;
 
        /** The heartbeat manager with resource manager. */
        private final HeartbeatManager<Void, Void> 
resourceManagerHeartbeatManager;
@@ -938,8 +940,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        }
 
        @Override
-       public void heartbeatFromTaskManager(final ResourceID resourceID) {
-               taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+       public void heartbeatFromTaskManager(final ResourceID resourceID, 
AccumulatorReport accumulatorReport) {
+               taskManagerHeartbeatManager.receiveHeartbeat(resourceID, 
accumulatorReport);
        }
 
        @Override
@@ -1504,7 +1506,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
        }
 
-       private class TaskManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
+       private class TaskManagerHeartbeatListener implements 
HeartbeatListener<AccumulatorReport, Void> {
 
                private final JobMasterGateway jobMasterGateway;
 
@@ -1522,8 +1524,10 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
 
                @Override
-               public void reportPayload(ResourceID resourceID, Void payload) {
-                       // nothing to do since there is no payload
+               public void reportPayload(ResourceID resourceID, 
AccumulatorReport payload) {
+                       for (AccumulatorSnapshot snapshot : 
payload.getAccumulatorSnapshots()) {
+                               executionGraph.updateAccumulators(snapshot);
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 1e1bdda..4ea9357 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -219,8 +220,11 @@ public interface JobMasterGateway extends
         * Sends the heartbeat to job manager from task manager.
         *
         * @param resourceID unique id of the task manager
+        * @param accumulatorReport report containing accumulator updates
         */
-       void heartbeatFromTaskManager(final ResourceID resourceID);
+       void heartbeatFromTaskManager(
+               final ResourceID resourceID,
+               final AccumulatorReport accumulatorReport);
 
        /**
         * Sends heartbeat request from the resource manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
new file mode 100644
index 0000000..7c8c767
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A report about the current values of all accumulators of the TaskExecutor 
for a given job.
+ */
+public class AccumulatorReport implements Serializable {
+       private final Collection<AccumulatorSnapshot> accumulatorSnapshots;
+
+       public AccumulatorReport(List<AccumulatorSnapshot> 
accumulatorSnapshots) {
+               this.accumulatorSnapshots = accumulatorSnapshots;
+       }
+
+       public Collection<AccumulatorSnapshot> getAccumulatorSnapshots() {
+               return accumulatorSnapshots;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7409175..f25601e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -107,6 +107,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -138,7 +139,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        private final TaskManagerConfiguration taskManagerConfiguration;
 
        /** The heartbeat manager for job manager in the task manager. */
-       private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
+       private final HeartbeatManager<Void, AccumulatorReport> 
jobManagerHeartbeatManager;
 
        /** The heartbeat manager for resource manager in the task manager. */
        private final HeartbeatManager<Void, SlotReport> 
resourceManagerHeartbeatManager;
@@ -1050,14 +1051,14 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
                jobManagerTable.put(jobId, newJobManagerConnection);
 
                // monitor the job manager as heartbeat target
-               jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, 
new HeartbeatTarget<Void>() {
+               jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, 
new HeartbeatTarget<AccumulatorReport>() {
                        @Override
-                       public void receiveHeartbeat(ResourceID resourceID, 
Void payload) {
-                               
jobMasterGateway.heartbeatFromTaskManager(resourceID);
+                       public void receiveHeartbeat(ResourceID resourceID, 
AccumulatorReport payload) {
+                               
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
                        }
 
                        @Override
-                       public void requestHeartbeat(ResourceID resourceID, 
Void payload) {
+                       public void requestHeartbeat(ResourceID resourceID, 
AccumulatorReport payload) {
                                // request heartbeat will never be called on 
the task manager side
                        }
                });
@@ -1488,7 +1489,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
        }
 
-       private class JobManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
+       private class JobManagerHeartbeatListener implements 
HeartbeatListener<Void, AccumulatorReport> {
 
                @Override
                public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
@@ -1515,8 +1516,23 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
 
                @Override
-               public CompletableFuture<Void> retrievePayload(ResourceID 
resourceID) {
-                       return CompletableFuture.completedFuture(null);
+               public CompletableFuture<AccumulatorReport> 
retrievePayload(ResourceID resourceID) {
+                       validateRunsInMainThread();
+                       JobManagerConnection jobManagerConnection = 
jobManagerConnections.get(resourceID);
+                       if (jobManagerConnection != null) {
+                               JobID jobId = jobManagerConnection.getJobID();
+
+                               List<AccumulatorSnapshot> accumulatorSnapshots 
= new ArrayList<>(16);
+                               Iterator<Task> allTasks = 
taskSlotTable.getTasks(jobId);
+
+                               while (allTasks.hasNext()) {
+                                       Task task = allTasks.next();
+                                       
accumulatorSnapshots.add(task.getAccumulatorRegistry().getSnapshot());
+                               }
+                               return CompletableFuture.completedFuture(new 
AccumulatorReport(accumulatorSnapshots));
+                       } else {
+                               return CompletableFuture.completedFuture(new 
AccumulatorReport(Collections.emptyList()));
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 0d57a56..65117af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -130,7 +131,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        }
 
        @Override
-       public void heartbeatFromTaskManager(ResourceID resourceID) {
+       public void heartbeatFromTaskManager(ResourceID resourceID, 
AccumulatorReport accumulatorReport) {
                throw new UnsupportedOperationException();
        }
 

Reply via email to