This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b174deec01552f28b6ca1003d07831396ce48117
Author: Lijie Wang <[email protected]>
AuthorDate: Mon Jun 27 14:51:47 2022 +0800

    [FLINK-28143][runtime] Introduce BlocklistHandler.
    
    This closes #20079.
---
 .../flink/runtime/blocklist/BlocklistContext.java  |  39 +++++
 .../flink/runtime/blocklist/BlocklistHandler.java  |  67 ++++++++
 .../flink/runtime/blocklist/BlocklistListener.java |  32 ++++
 .../runtime/blocklist/DefaultBlocklistHandler.java | 178 +++++++++++++++++++++
 .../blocklist/DefaultBlocklistHandlerTest.java     | 173 ++++++++++++++++++++
 5 files changed, 489 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java
new file mode 100644
index 00000000000..c31adbf5410
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** This class is responsible for blocking and unblocking resources. */
+public interface BlocklistContext {
+
+    /**
+     * Block resources on the nodes.
+     *
+     * @param blockedNodes the nodes to block resources
+     */
+    void blockResources(Collection<BlockedNode> blockedNodes);
+
+    /**
+     * Unblock resources on the nodes.
+     *
+     * @param unBlockedNodes the nodes to unblock resources
+     */
+    void unblockResources(Collection<BlockedNode> unBlockedNodes);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java
new file mode 100644
index 00000000000..9247595125f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * This class is responsible for managing all {@link BlockedNode}s and 
performing them on resources.
+ */
+public interface BlocklistHandler {
+
+    /**
+     * Add new blocked node records. If a node (identified by node id) already 
exists, the newly
+     * added one will be merged with the existing one.
+     *
+     * @param newNodes the new blocked node records
+     */
+    void addNewBlockedNodes(Collection<BlockedNode> newNodes);
+
+    /**
+     * Returns whether the given task manager is blocked (located on blocked 
nodes).
+     *
+     * @param taskManagerId ID of the task manager to query
+     * @return true if the given task manager is blocked, otherwise false
+     */
+    boolean isBlockedTaskManager(ResourceID taskManagerId);
+
+    /**
+     * Get all blocked node ids.
+     *
+     * @return a set containing all blocked node ids
+     */
+    Set<String> getAllBlockedNodeIds();
+
+    /**
+     * Register a new blocklist listener.
+     *
+     * @param blocklistListener the newly registered listener
+     */
+    void registerBlocklistListener(BlocklistListener blocklistListener);
+
+    /**
+     * Deregister a blocklist listener.
+     *
+     * @param blocklistListener the listener to deregister
+     */
+    void deregisterBlocklistListener(BlocklistListener blocklistListener);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java
new file mode 100644
index 00000000000..d8a13aa537f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** A listener that want to be notified when blocklist changes. */
+public interface BlocklistListener {
+
+    /**
+     * Notify new blocked node records.
+     *
+     * @param newNodes the new blocked node records
+     */
+    void notifyNewBlockedNodes(Collection<BlockedNode> newNodes);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java
new file mode 100644
index 00000000000..d1f30dcdf50
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link BlocklistHandler}. */
+public class DefaultBlocklistHandler implements BlocklistHandler, 
AutoCloseable {
+
+    private final Logger log;
+
+    private final Function<ResourceID, String> taskManagerNodeIdRetriever;
+
+    private final BlocklistTracker blocklistTracker;
+
+    private final BlocklistContext blocklistContext;
+
+    private final Set<BlocklistListener> blocklistListeners = new HashSet<>();
+
+    private final Time timeoutCheckInterval;
+
+    private volatile ScheduledFuture<?> timeoutCheckFuture;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    public DefaultBlocklistHandler(
+            BlocklistTracker blocklistTracker,
+            BlocklistContext blocklistContext,
+            Function<ResourceID, String> taskManagerNodeIdRetriever,
+            Time timeoutCheckInterval,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Logger log) {
+        this.blocklistTracker = checkNotNull(blocklistTracker);
+        this.blocklistContext = checkNotNull(blocklistContext);
+        this.taskManagerNodeIdRetriever = 
checkNotNull(taskManagerNodeIdRetriever);
+        this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.log = checkNotNull(log);
+
+        scheduleTimeoutCheck();
+    }
+
+    private void scheduleTimeoutCheck() {
+        this.timeoutCheckFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            removeTimeoutNodes();
+                            scheduleTimeoutCheck();
+                        },
+                        timeoutCheckInterval.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    private void removeTimeoutNodes() {
+        Collection<BlockedNode> removedNodes =
+                
blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
+        if (!removedNodes.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Remove {} timeout blocked nodes, details {}. "
+                                + "Total {} blocked nodes currently, details: 
{}.",
+                        removedNodes.size(),
+                        removedNodes,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Remove {} timeout blocked nodes. Total {} blocked 
nodes currently.",
+                        removedNodes.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+            blocklistContext.unblockResources(removedNodes);
+        }
+    }
+
+    private void assertRunningInMainThread() {
+        mainThreadExecutor.assertRunningInMainThread();
+    }
+
+    @Override
+    public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+        assertRunningInMainThread();
+
+        Collection<BlockedNode> newlyAddedOrMerged = 
blocklistTracker.addNewBlockedNodes(newNodes);
+        if (!newlyAddedOrMerged.isEmpty()) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "Newly added/merged {} blocked nodes, details: {}."
+                                + " Total {} blocked nodes currently, details: 
{}.",
+                        newlyAddedOrMerged.size(),
+                        newlyAddedOrMerged,
+                        blocklistTracker.getAllBlockedNodes().size(),
+                        blocklistTracker.getAllBlockedNodes());
+            } else {
+                log.info(
+                        "Newly added/merged {} blocked nodes. Total {} blocked 
nodes currently.",
+                        newlyAddedOrMerged.size(),
+                        blocklistTracker.getAllBlockedNodes().size());
+            }
+
+            blocklistListeners.forEach(
+                    listener -> 
listener.notifyNewBlockedNodes(newlyAddedOrMerged));
+            blocklistContext.blockResources(newlyAddedOrMerged);
+        }
+    }
+
+    @Override
+    public boolean isBlockedTaskManager(ResourceID taskManagerId) {
+        assertRunningInMainThread();
+        String nodeId = 
checkNotNull(taskManagerNodeIdRetriever.apply(taskManagerId));
+        return blocklistTracker.isBlockedNode(nodeId);
+    }
+
+    @Override
+    public Set<String> getAllBlockedNodeIds() {
+        assertRunningInMainThread();
+
+        return blocklistTracker.getAllBlockedNodeIds();
+    }
+
+    @Override
+    public void registerBlocklistListener(BlocklistListener blocklistListener) 
{
+        assertRunningInMainThread();
+
+        checkNotNull(blocklistListener);
+        if (!blocklistListeners.contains(blocklistListener)) {
+            blocklistListeners.add(blocklistListener);
+            Collection<BlockedNode> allBlockedNodes = 
blocklistTracker.getAllBlockedNodes();
+            if (!allBlockedNodes.isEmpty()) {
+                blocklistListener.notifyNewBlockedNodes(allBlockedNodes);
+            }
+        }
+    }
+
+    @Override
+    public void deregisterBlocklistListener(BlocklistListener 
blocklistListener) {
+        assertRunningInMainThread();
+
+        checkNotNull(blocklistListener);
+        blocklistListeners.remove(blocklistListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (timeoutCheckFuture != null) {
+            timeoutCheckFuture.cancel(false);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java
new file mode 100644
index 00000000000..ff0ea85fffe
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultBlocklistHandler}. */
+class DefaultBlocklistHandlerTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
+
+    @Test
+    void testAddNewBlockedNodes() throws Exception {
+        BlockedNode node1 = new BlockedNode("node1", "cause", Long.MAX_VALUE);
+        BlockedNode node2 = new BlockedNode("node2", "cause", Long.MAX_VALUE);
+
+        TestBlocklistContext context = new TestBlocklistContext();
+        TestBlocklistListener listener = new TestBlocklistListener();
+
+        try (DefaultBlocklistHandler handler = 
createDefaultBlocklistHandler(context)) {
+            handler.registerBlocklistListener(listener);
+            assertThat(listener.notifiedTimes).isEqualTo(0);
+            assertThat(listener.notifiedNodes).isEmpty();
+            assertThat(context.allBlockedNodes).isEmpty();
+
+            // add node1, node2
+            handler.addNewBlockedNodes(Arrays.asList(node1, node2));
+            // check listener and context
+            assertThat(listener.notifiedTimes).isEqualTo(1);
+            
assertThat(listener.notifiedNodes).containsExactlyInAnyOrder(node1, node2);
+            
assertThat(context.allBlockedNodes).containsExactlyInAnyOrder(node1, node2);
+
+            // add node1, node2 again, should not notify listener
+            handler.addNewBlockedNodes(Arrays.asList(node1, node2));
+            assertThat(listener.notifiedTimes).isEqualTo(1);
+
+            // register a new listener, will notify all items
+            TestBlocklistListener listener2 = new TestBlocklistListener();
+            handler.registerBlocklistListener(listener2);
+            assertThat(listener2.notifiedTimes).isEqualTo(1);
+            
assertThat(listener2.notifiedNodes).containsExactlyInAnyOrder(node1, node2);
+        }
+    }
+
+    @Test
+    void testRemoveTimeoutNodes() throws Exception {
+        long currentTimestamp = System.currentTimeMillis();
+        BlockedNode node1 = new BlockedNode("node1", "cause", currentTimestamp 
+ 1000L);
+        BlockedNode node2 = new BlockedNode("node2", "cause", currentTimestamp 
+ 3000L);
+
+        TestBlocklistContext context = new TestBlocklistContext();
+        try (DefaultBlocklistHandler handler = 
createDefaultBlocklistHandler(context)) {
+
+            handler.addNewBlockedNodes(Arrays.asList(node1, node2));
+            assertThat(handler.getAllBlockedNodeIds()).hasSize(2);
+            assertThat(context.allUnBlockedNodes).hasSize(0);
+
+            // wait node1 timeout
+            CommonTestUtils.waitUntilCondition(() -> 
handler.getAllBlockedNodeIds().size() == 1);
+            assertThat(context.allUnBlockedNodes).containsExactly(node1);
+
+            // wait node2 timeout
+            CommonTestUtils.waitUntilCondition(() -> 
handler.getAllBlockedNodeIds().size() == 0);
+            assertThat(context.allUnBlockedNodes).containsExactly(node1, 
node2);
+        }
+    }
+
+    @Test
+    void testIsBlockedTaskManager() throws Exception {
+        ResourceID resourceID1 = ResourceID.generate();
+        ResourceID resourceID2 = ResourceID.generate();
+        ResourceID resourceID3 = ResourceID.generate();
+
+        Map<ResourceID, String> taskManagerToNode = new HashMap<>();
+        taskManagerToNode.put(resourceID1, "node1");
+        taskManagerToNode.put(resourceID2, "node1");
+        taskManagerToNode.put(resourceID3, "node2");
+
+        try (DefaultBlocklistHandler handler = 
createDefaultBlocklistHandler(taskManagerToNode)) {
+
+            handler.addNewBlockedNodes(
+                    Collections.singleton(new BlockedNode("node1", "cause", 
Long.MAX_VALUE)));
+
+            assertThat(handler.isBlockedTaskManager(resourceID1)).isTrue();
+            assertThat(handler.isBlockedTaskManager(resourceID2)).isTrue();
+            assertThat(handler.isBlockedTaskManager(resourceID3)).isFalse();
+        }
+    }
+
+    private DefaultBlocklistHandler createDefaultBlocklistHandler(
+            BlocklistContext blocklistContext) {
+        return new DefaultBlocklistHandler(
+                new DefaultBlocklistTracker(),
+                blocklistContext,
+                resourceID -> "node",
+                Time.milliseconds(100L),
+                ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                LOG);
+    }
+
+    private DefaultBlocklistHandler createDefaultBlocklistHandler(
+            Map<ResourceID, String> taskManagerToNode) {
+        return new DefaultBlocklistHandler(
+                new DefaultBlocklistTracker(),
+                new TestBlocklistContext(),
+                taskManagerToNode::get,
+                Time.milliseconds(100L),
+                ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                LOG);
+    }
+
+    private static class TestBlocklistListener implements BlocklistListener {
+
+        private int notifiedTimes = 0;
+
+        private final List<BlockedNode> notifiedNodes = new ArrayList<>();
+
+        @Override
+        public void notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
+            notifiedTimes++;
+            notifiedNodes.addAll(newNodes);
+        }
+    }
+
+    private static class TestBlocklistContext implements BlocklistContext {
+        private final List<BlockedNode> allBlockedNodes = new ArrayList<>();
+
+        private final List<BlockedNode> allUnBlockedNodes = new ArrayList<>();
+
+        @Override
+        public void blockResources(Collection<BlockedNode> blockedNodes) {
+            allBlockedNodes.addAll(blockedNodes);
+        }
+
+        @Override
+        public void unblockResources(Collection<BlockedNode> unBlockedNodes) {
+            allUnBlockedNodes.addAll(unBlockedNodes);
+        }
+    }
+}

Reply via email to