This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 387b2a473d0c0a8d58d1ca0401894dffc0527b31
Author: Lijie Wang <[email protected]>
AuthorDate: Mon Jul 4 17:52:09 2022 +0800

    [FLINK-28144][runtime] Let JobMaster support blocklist.
    
    This closes #20153.
---
 .../flink/runtime/blocklist/BlockedNode.java       |  2 +-
 .../flink/runtime/blocklist/BlocklistListener.java |  6 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 76 ++++++++++++++++++++-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  4 +-
 .../factories/DefaultJobMasterServiceFactory.java  |  3 +
 .../blocklist/DefaultBlocklistHandlerTest.java     |  5 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 78 ++++++++++++++++++++++
 .../runtime/jobmaster/utils/JobMasterBuilder.java  | 11 +++
 .../jobmaster/utils/TestingJobMasterGateway.java   |  6 ++
 9 files changed, 185 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java
index 945094f5532..4391fe522f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNode.java
@@ -33,7 +33,7 @@ public class BlockedNode implements Serializable {
 
     private final long endTimestamp;
 
-    BlockedNode(String nodeId, String cause, long endTimestamp) {
+    public BlockedNode(String nodeId, String cause, long endTimestamp) {
         this.nodeId = checkNotNull(nodeId);
         this.cause = checkNotNull(cause);
         this.endTimestamp = endTimestamp;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java
index d8a13aa537f..92130539686 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.blocklist;
 
+import org.apache.flink.runtime.messages.Acknowledge;
+
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
 /** A listener that want to be notified when blocklist changes. */
 public interface BlocklistListener {
@@ -27,6 +30,7 @@ public interface BlocklistListener {
      * Notify new blocked node records.
      *
      * @param newNodes the new blocked node records
+     * @return Future acknowledge once the new nodes have successfully 
notified.
      */
-    void notifyNewBlockedNodes(Collection<BlockedNode> newNodes);
+    CompletableFuture<Acknowledge> 
notifyNewBlockedNodes(Collection<BlockedNode> newNodes);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3d11f835e36..1d9024f50ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -22,11 +22,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistContext;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -51,6 +56,8 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import 
org.apache.flink.runtime.jobmaster.slotpool.BlocklistDeclarativeSlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
 import 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
@@ -90,6 +97,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation.ResolutionMode;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -118,6 +126,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.checkpoint.TaskStateSnapshot.deserializeTaskStateSnapshot;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution 
of a single {@link
@@ -208,6 +217,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
     private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
 
+    private final BlocklistHandler blocklistHandler;
+
     // ------------------------------------------------------------------------
 
     public JobMaster(
@@ -228,6 +239,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
             PartitionTrackerFactory partitionTrackerFactory,
             ExecutionDeploymentTracker executionDeploymentTracker,
             ExecutionDeploymentReconciler.Factory 
executionDeploymentReconcilerFactory,
+            BlocklistHandler.Factory blocklistHandlerFactory,
             long initializationTimestamp)
             throws Exception {
 
@@ -302,11 +314,21 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         resourceManagerLeaderRetriever =
                 highAvailabilityServices.getResourceManagerLeaderRetriever();
 
+        this.registeredTaskManagers = new HashMap<>();
+        this.blocklistHandler =
+                blocklistHandlerFactory.create(
+                        new JobMasterBlocklistContext(),
+                        this::getNodeIdOfTaskManager,
+                        getMainThreadExecutor(),
+                        log);
+
         this.slotPoolService =
                 checkNotNull(slotPoolServiceSchedulerFactory)
-                        .createSlotPoolService(jid, new 
DefaultDeclarativeSlotPoolFactory());
+                        .createSlotPoolService(
+                                jid,
+                                createDeclarativeSlotPoolFactory(
+                                        
jobMasterConfiguration.getConfiguration()));
 
-        this.registeredTaskManagers = new HashMap<>(4);
         this.partitionTracker =
                 checkNotNull(partitionTrackerFactory)
                         .create(
@@ -380,6 +402,15 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                 resourceId, new TaskManagerHeartbeatListener(), 
getMainThreadExecutor(), log);
     }
 
+    private DeclarativeSlotPoolFactory createDeclarativeSlotPoolFactory(
+            Configuration configuration) {
+        if (BlocklistUtils.isBlocklistEnabled(configuration)) {
+            return new 
BlocklistDeclarativeSlotPoolFactory(blocklistHandler::isBlockedTaskManager);
+        } else {
+            return new DefaultDeclarativeSlotPoolFactory();
+        }
+    }
+
     // 
----------------------------------------------------------------------------------------------
     // Lifecycle management
     // 
----------------------------------------------------------------------------------------------
@@ -899,6 +930,12 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> 
notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        blocklistHandler.addNewBlockedNodes(newNodes);
+        return CompletableFuture.completedFuture(Acknowledge.get());
+    }
+
     // 
----------------------------------------------------------------------------------------------
     // Internal methods
     // 
----------------------------------------------------------------------------------------------
@@ -1179,6 +1216,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         slotPoolService.disconnectResourceManager();
     }
 
+    private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
+        checkState(registeredTaskManagers.containsKey(taskManagerId));
+        return 
registeredTaskManagers.get(taskManagerId).getTaskManagerLocation().getNodeId();
+    }
+
     // 
----------------------------------------------------------------------------------------------
     // Service methods
     // 
----------------------------------------------------------------------------------------------
@@ -1449,4 +1491,34 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
             return null;
         }
     }
+
+    private class JobMasterBlocklistContext implements BlocklistContext {
+
+        @Override
+        public void blockResources(Collection<BlockedNode> blockedNodes) {
+            Set<String> blockedNodeIds =
+                    
blockedNodes.stream().map(BlockedNode::getNodeId).collect(Collectors.toSet());
+
+            Collection<ResourceID> blockedTaskMangers =
+                    registeredTaskManagers.keySet().stream()
+                            .filter(
+                                    taskManagerId ->
+                                            blockedNodeIds.contains(
+                                                    
getNodeIdOfTaskManager(taskManagerId)))
+                            .collect(Collectors.toList());
+
+            blockedTaskMangers.forEach(
+                    taskManagerId -> {
+                        Exception cause =
+                                new FlinkRuntimeException(
+                                        String.format(
+                                                "TaskManager %s is blocked.",
+                                                
taskManagerId.getStringWithMetadata()));
+                        
slotPoolService.releaseFreeSlotsOnTaskManager(taskManagerId, cause);
+                    });
+        }
+
+        @Override
+        public void unblockResources(Collection<BlockedNode> unblockedNodes) {}
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 03c853d4946..4cbd4682d66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.blocklist.BlocklistListener;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -59,7 +60,8 @@ public interface JobMasterGateway
                 FencedRpcGateway<JobMasterId>,
                 KvStateLocationOracle,
                 KvStateRegistryGateway,
-                JobMasterOperatorEventGateway {
+                JobMasterOperatorEventGateway,
+                BlocklistListener {
 
     /**
      * Cancels the currently executed job.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index bcde4b49475..ed5c7903634 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.factories;
 
+import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -119,6 +120,8 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
                                         jobGraph.getJobID(), shuffleMaster, 
lookup),
                         new DefaultExecutionDeploymentTracker(),
                         DefaultExecutionDeploymentReconciler::new,
+                        BlocklistUtils.loadBlocklistHandlerFactory(
+                                jobMasterConfiguration.getConfiguration()),
                         initializationTimestamp);
 
         jobMaster.start();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java
index f6092d416d6..eee25c7a427 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blocklist;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 
@@ -191,9 +192,11 @@ class DefaultBlocklistHandlerTest {
         private final List<BlockedNode> notifiedNodes = new ArrayList<>();
 
         @Override
-        public void notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        public CompletableFuture<Acknowledge> notifyNewBlockedNodes(
+                Collection<BlockedNode> newNodes) {
             notifiedTimes++;
             notifiedNodes.addAll(newNodes);
+            return CompletableFuture.completedFuture(Acknowledge.get());
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e07ad72516d..1bc43206421 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -36,6 +36,8 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
@@ -1886,6 +1888,82 @@ class JobMasterTest {
         }
     }
 
+    @Test
+    void testBlockResourcesWillTriggerReleaseFreeSlots() throws Exception {
+        JobVertex jobVertex = new JobVertex("jobVertex");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        jobVertex.setParallelism(2);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+
+        final LocalUnresolvedTaskManagerLocation taskManagerLocation =
+                new LocalUnresolvedTaskManagerLocation();
+
+        final CompletableFuture<AllocationID> freedSlotFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<AllocationID> freedSlotFuture2 = new 
CompletableFuture<>();
+        final TestingTaskExecutorGateway testingTaskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setFreeSlotFunction(
+                                (allocationID, throwable) -> {
+                                    if (!freedSlotFuture1.isDone()) {
+                                        
freedSlotFuture1.complete(allocationID);
+                                    } else if (!freedSlotFuture2.isDone()) {
+                                        
freedSlotFuture2.complete(allocationID);
+                                    }
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withConfiguration(configuration)
+                        .withHighAvailabilityServices(haServices)
+                        .withHeartbeatServices(heartbeatServices)
+                        .withBlocklistHandlerFactory(
+                                new 
DefaultBlocklistHandler.Factory(Duration.ofMillis((100L))))
+                        .createJobMaster()) {
+
+            jobMaster.start();
+
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
+
+            final Collection<SlotOffer> slotOffers =
+                    registerSlotsAtJobMaster(
+                            2,
+                            jobMasterGateway,
+                            jobGraph.getJobID(),
+                            testingTaskExecutorGateway,
+                            taskManagerLocation);
+
+            // check that we accepted the offered slot
+            assertThat(slotOffers).hasSize(2);
+
+            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
+
+            // 1 slot reserved, 1 slot free
+            jobMasterGateway
+                    .updateTaskExecutionState(
+                            new TaskExecutionState(
+                                    getExecutions(jobMasterGateway)
+                                            .iterator()
+                                            .next()
+                                            .getAttemptId(),
+                                    ExecutionState.FINISHED))
+                    .get();
+
+            BlockedNode blockedNode =
+                    new BlockedNode(
+                            taskManagerLocation.getNodeId(),
+                            "Test cause",
+                            System.currentTimeMillis());
+
+            
jobMasterGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode)).get();
+
+            assertThat(freedSlotFuture1).isDone();
+            assertThat(freedSlotFuture2).isNotDone();
+        }
+    }
+
     private void runJobFailureWhenTaskExecutorTerminatesTest(
             HeartbeatServices heartbeatServices,
             BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> 
jobReachedRunningState)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 8aebbc046eb..d1c26edac0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -18,6 +18,8 @@
 package org.apache.flink.runtime.jobmaster.utils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blocklist.BlocklistHandler;
+import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -86,6 +88,8 @@ public class JobMasterBuilder {
     private ExecutionDeploymentReconciler.Factory 
executionDeploymentReconcilerFactory =
             DefaultExecutionDeploymentReconciler::new;
 
+    private BlocklistHandler.Factory blocklistHandlerFactory = new 
NoOpBlocklistHandler.Factory();
+
     public JobMasterBuilder(JobGraph jobGraph, RpcService rpcService) {
         TestingHighAvailabilityServices testingHighAvailabilityServices =
                 new TestingHighAvailabilityServices();
@@ -167,6 +171,12 @@ public class JobMasterBuilder {
         return this;
     }
 
+    public JobMasterBuilder withBlocklistHandlerFactory(
+            BlocklistHandler.Factory blocklistHandlerFactory) {
+        this.blocklistHandlerFactory = blocklistHandlerFactory;
+        return this;
+    }
+
     public JobMasterBuilder withJobMasterId(JobMasterId jobMasterId) {
         this.jobMasterId = jobMasterId;
         return this;
@@ -199,6 +209,7 @@ public class JobMasterBuilder {
                 partitionTrackerFactory,
                 executionDeploymentTracker,
                 executionDeploymentReconcilerFactory,
+                blocklistHandlerFactory,
                 System.currentTimeMillis());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 33e1fdb0261..ca840018efe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.blocklist.BlockedNode;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -536,4 +537,9 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
             Collection<ResultPartitionID> partitionIds) {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public CompletableFuture<Acknowledge> 
notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        return CompletableFuture.completedFuture(Acknowledge.get());
+    }
 }

Reply via email to