Repository: flink Updated Branches: refs/heads/release-1.5 071dedcb7 -> 60e05c05f
[FLINK-8881][runtime] Send accumulator updates via heartbeats Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6657aa20 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6657aa20 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6657aa20 Branch: refs/heads/release-1.5 Commit: 6657aa2090b6b0f56f329152b3aaa1c147e73380 Parents: 071dedc Author: zentol <[email protected]> Authored: Wed Mar 14 18:52:16 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 20 19:03:21 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 16 +++++--- .../runtime/jobmaster/JobMasterGateway.java | 6 ++- .../runtime/taskexecutor/AccumulatorReport.java | 40 ++++++++++++++++++++ .../runtime/taskexecutor/TaskExecutor.java | 32 ++++++++++++---- .../utils/TestingJobMasterGateway.java | 3 +- 5 files changed, 81 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index f0b29bf..6878032 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -27,6 +27,7 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; @@ -93,6 +94,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -166,7 +168,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private final JobManagerJobMetricGroup jobMetricGroup; /** The heartbeat manager with task managers. */ - private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager; + private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager; /** The heartbeat manager with resource manager. */ private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; @@ -938,8 +940,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } @Override - public void heartbeatFromTaskManager(final ResourceID resourceID) { - taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null); + public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) { + taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport); } @Override @@ -1504,7 +1506,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } } - private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> { + private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> { private final JobMasterGateway jobMasterGateway; @@ -1522,8 +1524,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } @Override - public void reportPayload(ResourceID resourceID, Void payload) { - // nothing to do since there is no payload + public void reportPayload(ResourceID resourceID, AccumulatorReport payload) { + for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) { + executionGraph.updateAccumulators(snapshot); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 1e1bdda..4ea9357 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -219,8 +220,11 @@ public interface JobMasterGateway extends * Sends the heartbeat to job manager from task manager. * * @param resourceID unique id of the task manager + * @param accumulatorReport report containing accumulator updates */ - void heartbeatFromTaskManager(final ResourceID resourceID); + void heartbeatFromTaskManager( + final ResourceID resourceID, + final AccumulatorReport accumulatorReport); /** * Sends heartbeat request from the resource manager. http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java new file mode 100644 index 0000000..7c8c767 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +/** + * A report about the current values of all accumulators of the TaskExecutor for a given job. + */ +public class AccumulatorReport implements Serializable { + private final Collection<AccumulatorSnapshot> accumulatorSnapshots; + + public AccumulatorReport(List<AccumulatorSnapshot> accumulatorSnapshots) { + this.accumulatorSnapshots = accumulatorSnapshots; + } + + public Collection<AccumulatorSnapshot> getAccumulatorSnapshots() { + return accumulatorSnapshots; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 7409175..f25601e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -107,6 +107,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -138,7 +139,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final TaskManagerConfiguration taskManagerConfiguration; /** The heartbeat manager for job manager in the task manager. */ - private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; + private final HeartbeatManager<Void, AccumulatorReport> jobManagerHeartbeatManager; /** The heartbeat manager for resource manager in the task manager. */ private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager; @@ -1050,14 +1051,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { jobManagerTable.put(jobId, newJobManagerConnection); // monitor the job manager as heartbeat target - jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() { + jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() { @Override - public void receiveHeartbeat(ResourceID resourceID, Void payload) { - jobMasterGateway.heartbeatFromTaskManager(resourceID); + public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) { + jobMasterGateway.heartbeatFromTaskManager(resourceID, payload); } @Override - public void requestHeartbeat(ResourceID resourceID, Void payload) { + public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) { // request heartbeat will never be called on the task manager side } }); @@ -1488,7 +1489,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } - private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> { + private class JobManagerHeartbeatListener implements HeartbeatListener<Void, AccumulatorReport> { @Override public void notifyHeartbeatTimeout(final ResourceID resourceID) { @@ -1515,8 +1516,23 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } @Override - public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { - return CompletableFuture.completedFuture(null); + public CompletableFuture<AccumulatorReport> retrievePayload(ResourceID resourceID) { + validateRunsInMainThread(); + JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID); + if (jobManagerConnection != null) { + JobID jobId = jobManagerConnection.getJobID(); + + List<AccumulatorSnapshot> accumulatorSnapshots = new ArrayList<>(16); + Iterator<Task> allTasks = taskSlotTable.getTasks(jobId); + + while (allTasks.hasNext()) { + Task task = allTasks.next(); + accumulatorSnapshots.add(task.getAccumulatorRegistry().getSnapshot()); + } + return CompletableFuture.completedFuture(new AccumulatorReport(accumulatorSnapshots)); + } else { + return CompletableFuture.completedFuture(new AccumulatorReport(Collections.emptyList())); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6657aa20/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 0d57a56..65117af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -130,7 +131,7 @@ public class TestingJobMasterGateway implements JobMasterGateway { } @Override - public void heartbeatFromTaskManager(ResourceID resourceID) { + public void heartbeatFromTaskManager(ResourceID resourceID, AccumulatorReport accumulatorReport) { throw new UnsupportedOperationException(); }
