This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 387b2a473d0c0a8d58d1ca0401894dffc0527b31 Author: Lijie Wang <[email protected]> AuthorDate: Mon Jul 4 17:52:09 2022 +0800 [FLINK-28144][runtime] Let JobMaster support blocklist. This closes #20153. --- .../flink/runtime/blocklist/BlockedNode.java | 2 +- .../flink/runtime/blocklist/BlocklistListener.java | 6 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 76 ++++++++++++++++++++- .../flink/runtime/jobmaster/JobMasterGateway.java | 4 +- .../factories/DefaultJobMasterServiceFactory.java | 3 + .../blocklist/DefaultBlocklistHandlerTest.java | 5 +- .../flink/runtime/jobmaster/JobMasterTest.java | 78 ++++++++++++++++++++++ .../runtime/jobmaster/utils/JobMasterBuilder.java | 11 +++ .../jobmaster/utils/TestingJobMasterGateway.java | 6 ++ 9 files changed, 185 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java index 945094f5532..4391fe522f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java @@ -33,7 +33,7 @@ public class BlockedNode implements Serializable { private final long endTimestamp; - BlockedNode(String nodeId, String cause, long endTimestamp) { + public BlockedNode(String nodeId, String cause, long endTimestamp) { this.nodeId = checkNotNull(nodeId); this.cause = checkNotNull(cause); this.endTimestamp = endTimestamp; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java index d8a13aa537f..92130539686 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java @@ -18,7 +18,10 @@ package org.apache.flink.runtime.blocklist; +import org.apache.flink.runtime.messages.Acknowledge; + import java.util.Collection; +import java.util.concurrent.CompletableFuture; /** A listener that want to be notified when blocklist changes. */ public interface BlocklistListener { @@ -27,6 +30,7 @@ public interface BlocklistListener { * Notify new blocked node records. * * @param newNodes the new blocked node records + * @return Future acknowledge once the new nodes have successfully notified. */ - void notifyNewBlockedNodes(Collection<BlockedNode> newNodes); + CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNode> newNodes); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 3d11f835e36..1d9024f50ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -22,11 +22,16 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlockedNode; +import org.apache.flink.runtime.blocklist.BlocklistContext; +import org.apache.flink.runtime.blocklist.BlocklistHandler; +import org.apache.flink.runtime.blocklist.BlocklistUtils; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -51,6 +56,8 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; +import org.apache.flink.runtime.jobmaster.slotpool.BlocklistDeclarativeSlotPoolFactory; +import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory; import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolFactory; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; @@ -90,6 +97,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation.ResolutionMode; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; @@ -118,6 +126,7 @@ import java.util.stream.Collectors; import static org.apache.flink.runtime.checkpoint.TaskStateSnapshot.deserializeTaskStateSnapshot; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * JobMaster implementation. The job master is responsible for the execution of a single {@link @@ -208,6 +217,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; + private final BlocklistHandler blocklistHandler; + // ------------------------------------------------------------------------ public JobMaster( @@ -228,6 +239,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> PartitionTrackerFactory partitionTrackerFactory, ExecutionDeploymentTracker executionDeploymentTracker, ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory, + BlocklistHandler.Factory blocklistHandlerFactory, long initializationTimestamp) throws Exception { @@ -302,11 +314,21 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); + this.registeredTaskManagers = new HashMap<>(); + this.blocklistHandler = + blocklistHandlerFactory.create( + new JobMasterBlocklistContext(), + this::getNodeIdOfTaskManager, + getMainThreadExecutor(), + log); + this.slotPoolService = checkNotNull(slotPoolServiceSchedulerFactory) - .createSlotPoolService(jid, new DefaultDeclarativeSlotPoolFactory()); + .createSlotPoolService( + jid, + createDeclarativeSlotPoolFactory( + jobMasterConfiguration.getConfiguration())); - this.registeredTaskManagers = new HashMap<>(4); this.partitionTracker = checkNotNull(partitionTrackerFactory) .create( @@ -380,6 +402,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log); } + private DeclarativeSlotPoolFactory createDeclarativeSlotPoolFactory( + Configuration configuration) { + if (BlocklistUtils.isBlocklistEnabled(configuration)) { + return new BlocklistDeclarativeSlotPoolFactory(blocklistHandler::isBlockedTaskManager); + } else { + return new DefaultDeclarativeSlotPoolFactory(); + } + } + // ---------------------------------------------------------------------------------------------- // Lifecycle management // ---------------------------------------------------------------------------------------------- @@ -899,6 +930,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> return future; } + @Override + public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNode> newNodes) { + blocklistHandler.addNewBlockedNodes(newNodes); + return CompletableFuture.completedFuture(Acknowledge.get()); + } + // ---------------------------------------------------------------------------------------------- // Internal methods // ---------------------------------------------------------------------------------------------- @@ -1179,6 +1216,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> slotPoolService.disconnectResourceManager(); } + private String getNodeIdOfTaskManager(ResourceID taskManagerId) { + checkState(registeredTaskManagers.containsKey(taskManagerId)); + return registeredTaskManagers.get(taskManagerId).getTaskManagerLocation().getNodeId(); + } + // ---------------------------------------------------------------------------------------------- // Service methods // ---------------------------------------------------------------------------------------------- @@ -1449,4 +1491,34 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> return null; } } + + private class JobMasterBlocklistContext implements BlocklistContext { + + @Override + public void blockResources(Collection<BlockedNode> blockedNodes) { + Set<String> blockedNodeIds = + blockedNodes.stream().map(BlockedNode::getNodeId).collect(Collectors.toSet()); + + Collection<ResourceID> blockedTaskMangers = + registeredTaskManagers.keySet().stream() + .filter( + taskManagerId -> + blockedNodeIds.contains( + getNodeIdOfTaskManager(taskManagerId))) + .collect(Collectors.toList()); + + blockedTaskMangers.forEach( + taskManagerId -> { + Exception cause = + new FlinkRuntimeException( + String.format( + "TaskManager %s is blocked.", + taskManagerId.getStringWithMetadata())); + slotPoolService.releaseFreeSlotsOnTaskManager(taskManagerId, cause); + }); + } + + @Override + public void unblockResources(Collection<BlockedNode> unblockedNodes) {} + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 03c853d4946..4cbd4682d66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.blocklist.BlocklistListener; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -59,7 +60,8 @@ public interface JobMasterGateway FencedRpcGateway<JobMasterId>, KvStateLocationOracle, KvStateRegistryGateway, - JobMasterOperatorEventGateway { + JobMasterOperatorEventGateway, + BlocklistListener { /** * Cancels the currently executed job. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java index bcde4b49475..ed5c7903634 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmaster.factories; +import org.apache.flink.runtime.blocklist.BlocklistUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -119,6 +120,8 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory { jobGraph.getJobID(), shuffleMaster, lookup), new DefaultExecutionDeploymentTracker(), DefaultExecutionDeploymentReconciler::new, + BlocklistUtils.loadBlocklistHandlerFactory( + jobMasterConfiguration.getConfiguration()), initializationTimestamp); jobMaster.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java index f6092d416d6..eee25c7a427 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.blocklist; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; @@ -191,9 +192,11 @@ class DefaultBlocklistHandlerTest { private final List<BlockedNode> notifiedNodes = new ArrayList<>(); @Override - public void notifyNewBlockedNodes(Collection<BlockedNode> newNodes) { + public CompletableFuture<Acknowledge> notifyNewBlockedNodes( + Collection<BlockedNode> newNodes) { notifiedTimes++; notifiedNodes.addAll(newNodes); + return CompletableFuture.completedFuture(Acknowledge.get()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e07ad72516d..1bc43206421 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -36,6 +36,8 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blocklist.BlockedNode; +import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; @@ -1886,6 +1888,82 @@ class JobMasterTest { } } + @Test + void testBlockResourcesWillTriggerReleaseFreeSlots() throws Exception { + JobVertex jobVertex = new JobVertex("jobVertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(2); + final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex); + + final LocalUnresolvedTaskManagerLocation taskManagerLocation = + new LocalUnresolvedTaskManagerLocation(); + + final CompletableFuture<AllocationID> freedSlotFuture1 = new CompletableFuture<>(); + final CompletableFuture<AllocationID> freedSlotFuture2 = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction( + (allocationID, throwable) -> { + if (!freedSlotFuture1.isDone()) { + freedSlotFuture1.complete(allocationID); + } else if (!freedSlotFuture2.isDone()) { + freedSlotFuture2.complete(allocationID); + } + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); + + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService) + .withConfiguration(configuration) + .withHighAvailabilityServices(haServices) + .withHeartbeatServices(heartbeatServices) + .withBlocklistHandlerFactory( + new DefaultBlocklistHandler.Factory(Duration.ofMillis((100L)))) + .createJobMaster()) { + + jobMaster.start(); + + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); + + final Collection<SlotOffer> slotOffers = + registerSlotsAtJobMaster( + 2, + jobMasterGateway, + jobGraph.getJobID(), + testingTaskExecutorGateway, + taskManagerLocation); + + // check that we accepted the offered slot + assertThat(slotOffers).hasSize(2); + + waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway); + + // 1 slot reserved, 1 slot free + jobMasterGateway + .updateTaskExecutionState( + new TaskExecutionState( + getExecutions(jobMasterGateway) + .iterator() + .next() + .getAttemptId(), + ExecutionState.FINISHED)) + .get(); + + BlockedNode blockedNode = + new BlockedNode( + taskManagerLocation.getNodeId(), + "Test cause", + System.currentTimeMillis()); + + jobMasterGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode)).get(); + + assertThat(freedSlotFuture1).isDone(); + assertThat(freedSlotFuture2).isNotDone(); + } + } + private void runJobFailureWhenTaskExecutorTerminatesTest( HeartbeatServices heartbeatServices, BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> jobReachedRunningState) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java index 8aebbc046eb..d1c26edac0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.jobmaster.utils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blocklist.BlocklistHandler; +import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -86,6 +88,8 @@ public class JobMasterBuilder { private ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory = DefaultExecutionDeploymentReconciler::new; + private BlocklistHandler.Factory blocklistHandlerFactory = new NoOpBlocklistHandler.Factory(); + public JobMasterBuilder(JobGraph jobGraph, RpcService rpcService) { TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices(); @@ -167,6 +171,12 @@ public class JobMasterBuilder { return this; } + public JobMasterBuilder withBlocklistHandlerFactory( + BlocklistHandler.Factory blocklistHandlerFactory) { + this.blocklistHandlerFactory = blocklistHandlerFactory; + return this; + } + public JobMasterBuilder withJobMasterId(JobMasterId jobMasterId) { this.jobMasterId = jobMasterId; return this; @@ -199,6 +209,7 @@ public class JobMasterBuilder { partitionTrackerFactory, executionDeploymentTracker, executionDeploymentReconcilerFactory, + blocklistHandlerFactory, System.currentTimeMillis()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 33e1fdb0261..ca840018efe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.blocklist.BlockedNode; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -536,4 +537,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { Collection<ResultPartitionID> partitionIds) { return CompletableFuture.completedFuture(null); } + + @Override + public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNode> newNodes) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } }
