This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b174deec01552f28b6ca1003d07831396ce48117 Author: Lijie Wang <[email protected]> AuthorDate: Mon Jun 27 14:51:47 2022 +0800 [FLINK-28143][runtime] Introduce BlocklistHandler. This closes #20079. --- .../flink/runtime/blocklist/BlocklistContext.java | 39 +++++ .../flink/runtime/blocklist/BlocklistHandler.java | 67 ++++++++ .../flink/runtime/blocklist/BlocklistListener.java | 32 ++++ .../runtime/blocklist/DefaultBlocklistHandler.java | 178 +++++++++++++++++++++ .../blocklist/DefaultBlocklistHandlerTest.java | 173 ++++++++++++++++++++ 5 files changed, 489 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java new file mode 100644 index 00000000000..c31adbf5410 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import java.util.Collection; + +/** This class is responsible for blocking and unblocking resources. */ +public interface BlocklistContext { + + /** + * Block resources on the nodes. + * + * @param blockedNodes the nodes to block resources + */ + void blockResources(Collection<BlockedNode> blockedNodes); + + /** + * Unblock resources on the nodes. + * + * @param unBlockedNodes the nodes to unblock resources + */ + void unblockResources(Collection<BlockedNode> unBlockedNodes); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java new file mode 100644 index 00000000000..9247595125f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +import java.util.Collection; +import java.util.Set; + +/** + * This class is responsible for managing all {@link BlockedNode}s and performing them on resources. + */ +public interface BlocklistHandler { + + /** + * Add new blocked node records. If a node (identified by node id) already exists, the newly + * added one will be merged with the existing one. + * + * @param newNodes the new blocked node records + */ + void addNewBlockedNodes(Collection<BlockedNode> newNodes); + + /** + * Returns whether the given task manager is blocked (located on blocked nodes). + * + * @param taskManagerId ID of the task manager to query + * @return true if the given task manager is blocked, otherwise false + */ + boolean isBlockedTaskManager(ResourceID taskManagerId); + + /** + * Get all blocked node ids. + * + * @return a set containing all blocked node ids + */ + Set<String> getAllBlockedNodeIds(); + + /** + * Register a new blocklist listener. + * + * @param blocklistListener the newly registered listener + */ + void registerBlocklistListener(BlocklistListener blocklistListener); + + /** + * Deregister a blocklist listener. + * + * @param blocklistListener the listener to deregister + */ + void deregisterBlocklistListener(BlocklistListener blocklistListener); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java new file mode 100644 index 00000000000..d8a13aa537f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import java.util.Collection; + +/** A listener that want to be notified when blocklist changes. */ +public interface BlocklistListener { + + /** + * Notify new blocked node records. + * + * @param newNodes the new blocked node records + */ + void notifyNewBlockedNodes(Collection<BlockedNode> newNodes); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java new file mode 100644 index 00000000000..d1f30dcdf50 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Default implementation of {@link BlocklistHandler}. */ +public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable { + + private final Logger log; + + private final Function<ResourceID, String> taskManagerNodeIdRetriever; + + private final BlocklistTracker blocklistTracker; + + private final BlocklistContext blocklistContext; + + private final Set<BlocklistListener> blocklistListeners = new HashSet<>(); + + private final Time timeoutCheckInterval; + + private volatile ScheduledFuture<?> timeoutCheckFuture; + + private final ComponentMainThreadExecutor mainThreadExecutor; + + public DefaultBlocklistHandler( + BlocklistTracker blocklistTracker, + BlocklistContext blocklistContext, + Function<ResourceID, String> taskManagerNodeIdRetriever, + Time timeoutCheckInterval, + ComponentMainThreadExecutor mainThreadExecutor, + Logger log) { + this.blocklistTracker = checkNotNull(blocklistTracker); + this.blocklistContext = checkNotNull(blocklistContext); + this.taskManagerNodeIdRetriever = checkNotNull(taskManagerNodeIdRetriever); + this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval); + this.mainThreadExecutor = checkNotNull(mainThreadExecutor); + this.log = checkNotNull(log); + + scheduleTimeoutCheck(); + } + + private void scheduleTimeoutCheck() { + this.timeoutCheckFuture = + mainThreadExecutor.schedule( + () -> { + removeTimeoutNodes(); + scheduleTimeoutCheck(); + }, + timeoutCheckInterval.toMilliseconds(), + TimeUnit.MILLISECONDS); + } + + private void removeTimeoutNodes() { + Collection<BlockedNode> removedNodes = + blocklistTracker.removeTimeoutNodes(System.currentTimeMillis()); + if (!removedNodes.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug( + "Remove {} timeout blocked nodes, details {}. " + + "Total {} blocked nodes currently, details: {}.", + removedNodes.size(), + removedNodes, + blocklistTracker.getAllBlockedNodes().size(), + blocklistTracker.getAllBlockedNodes()); + } else { + log.info( + "Remove {} timeout blocked nodes. Total {} blocked nodes currently.", + removedNodes.size(), + blocklistTracker.getAllBlockedNodes().size()); + } + blocklistContext.unblockResources(removedNodes); + } + } + + private void assertRunningInMainThread() { + mainThreadExecutor.assertRunningInMainThread(); + } + + @Override + public void addNewBlockedNodes(Collection<BlockedNode> newNodes) { + assertRunningInMainThread(); + + Collection<BlockedNode> newlyAddedOrMerged = blocklistTracker.addNewBlockedNodes(newNodes); + if (!newlyAddedOrMerged.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug( + "Newly added/merged {} blocked nodes, details: {}." + + " Total {} blocked nodes currently, details: {}.", + newlyAddedOrMerged.size(), + newlyAddedOrMerged, + blocklistTracker.getAllBlockedNodes().size(), + blocklistTracker.getAllBlockedNodes()); + } else { + log.info( + "Newly added/merged {} blocked nodes. Total {} blocked nodes currently.", + newlyAddedOrMerged.size(), + blocklistTracker.getAllBlockedNodes().size()); + } + + blocklistListeners.forEach( + listener -> listener.notifyNewBlockedNodes(newlyAddedOrMerged)); + blocklistContext.blockResources(newlyAddedOrMerged); + } + } + + @Override + public boolean isBlockedTaskManager(ResourceID taskManagerId) { + assertRunningInMainThread(); + String nodeId = checkNotNull(taskManagerNodeIdRetriever.apply(taskManagerId)); + return blocklistTracker.isBlockedNode(nodeId); + } + + @Override + public Set<String> getAllBlockedNodeIds() { + assertRunningInMainThread(); + + return blocklistTracker.getAllBlockedNodeIds(); + } + + @Override + public void registerBlocklistListener(BlocklistListener blocklistListener) { + assertRunningInMainThread(); + + checkNotNull(blocklistListener); + if (!blocklistListeners.contains(blocklistListener)) { + blocklistListeners.add(blocklistListener); + Collection<BlockedNode> allBlockedNodes = blocklistTracker.getAllBlockedNodes(); + if (!allBlockedNodes.isEmpty()) { + blocklistListener.notifyNewBlockedNodes(allBlockedNodes); + } + } + } + + @Override + public void deregisterBlocklistListener(BlocklistListener blocklistListener) { + assertRunningInMainThread(); + + checkNotNull(blocklistListener); + blocklistListeners.remove(blocklistListener); + } + + @Override + public void close() throws Exception { + if (timeoutCheckFuture != null) { + timeoutCheckFuture.cancel(false); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java new file mode 100644 index 00000000000..ff0ea85fffe --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.testutils.CommonTestUtils; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DefaultBlocklistHandler}. */ +class DefaultBlocklistHandlerTest { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class); + + @Test + void testAddNewBlockedNodes() throws Exception { + BlockedNode node1 = new BlockedNode("node1", "cause", Long.MAX_VALUE); + BlockedNode node2 = new BlockedNode("node2", "cause", Long.MAX_VALUE); + + TestBlocklistContext context = new TestBlocklistContext(); + TestBlocklistListener listener = new TestBlocklistListener(); + + try (DefaultBlocklistHandler handler = createDefaultBlocklistHandler(context)) { + handler.registerBlocklistListener(listener); + assertThat(listener.notifiedTimes).isEqualTo(0); + assertThat(listener.notifiedNodes).isEmpty(); + assertThat(context.allBlockedNodes).isEmpty(); + + // add node1, node2 + handler.addNewBlockedNodes(Arrays.asList(node1, node2)); + // check listener and context + assertThat(listener.notifiedTimes).isEqualTo(1); + assertThat(listener.notifiedNodes).containsExactlyInAnyOrder(node1, node2); + assertThat(context.allBlockedNodes).containsExactlyInAnyOrder(node1, node2); + + // add node1, node2 again, should not notify listener + handler.addNewBlockedNodes(Arrays.asList(node1, node2)); + assertThat(listener.notifiedTimes).isEqualTo(1); + + // register a new listener, will notify all items + TestBlocklistListener listener2 = new TestBlocklistListener(); + handler.registerBlocklistListener(listener2); + assertThat(listener2.notifiedTimes).isEqualTo(1); + assertThat(listener2.notifiedNodes).containsExactlyInAnyOrder(node1, node2); + } + } + + @Test + void testRemoveTimeoutNodes() throws Exception { + long currentTimestamp = System.currentTimeMillis(); + BlockedNode node1 = new BlockedNode("node1", "cause", currentTimestamp + 1000L); + BlockedNode node2 = new BlockedNode("node2", "cause", currentTimestamp + 3000L); + + TestBlocklistContext context = new TestBlocklistContext(); + try (DefaultBlocklistHandler handler = createDefaultBlocklistHandler(context)) { + + handler.addNewBlockedNodes(Arrays.asList(node1, node2)); + assertThat(handler.getAllBlockedNodeIds()).hasSize(2); + assertThat(context.allUnBlockedNodes).hasSize(0); + + // wait node1 timeout + CommonTestUtils.waitUntilCondition(() -> handler.getAllBlockedNodeIds().size() == 1); + assertThat(context.allUnBlockedNodes).containsExactly(node1); + + // wait node2 timeout + CommonTestUtils.waitUntilCondition(() -> handler.getAllBlockedNodeIds().size() == 0); + assertThat(context.allUnBlockedNodes).containsExactly(node1, node2); + } + } + + @Test + void testIsBlockedTaskManager() throws Exception { + ResourceID resourceID1 = ResourceID.generate(); + ResourceID resourceID2 = ResourceID.generate(); + ResourceID resourceID3 = ResourceID.generate(); + + Map<ResourceID, String> taskManagerToNode = new HashMap<>(); + taskManagerToNode.put(resourceID1, "node1"); + taskManagerToNode.put(resourceID2, "node1"); + taskManagerToNode.put(resourceID3, "node2"); + + try (DefaultBlocklistHandler handler = createDefaultBlocklistHandler(taskManagerToNode)) { + + handler.addNewBlockedNodes( + Collections.singleton(new BlockedNode("node1", "cause", Long.MAX_VALUE))); + + assertThat(handler.isBlockedTaskManager(resourceID1)).isTrue(); + assertThat(handler.isBlockedTaskManager(resourceID2)).isTrue(); + assertThat(handler.isBlockedTaskManager(resourceID3)).isFalse(); + } + } + + private DefaultBlocklistHandler createDefaultBlocklistHandler( + BlocklistContext blocklistContext) { + return new DefaultBlocklistHandler( + new DefaultBlocklistTracker(), + blocklistContext, + resourceID -> "node", + Time.milliseconds(100L), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + LOG); + } + + private DefaultBlocklistHandler createDefaultBlocklistHandler( + Map<ResourceID, String> taskManagerToNode) { + return new DefaultBlocklistHandler( + new DefaultBlocklistTracker(), + new TestBlocklistContext(), + taskManagerToNode::get, + Time.milliseconds(100L), + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + LOG); + } + + private static class TestBlocklistListener implements BlocklistListener { + + private int notifiedTimes = 0; + + private final List<BlockedNode> notifiedNodes = new ArrayList<>(); + + @Override + public void notifyNewBlockedNodes(Collection<BlockedNode> newNodes) { + notifiedTimes++; + notifiedNodes.addAll(newNodes); + } + } + + private static class TestBlocklistContext implements BlocklistContext { + private final List<BlockedNode> allBlockedNodes = new ArrayList<>(); + + private final List<BlockedNode> allUnBlockedNodes = new ArrayList<>(); + + @Override + public void blockResources(Collection<BlockedNode> blockedNodes) { + allBlockedNodes.addAll(blockedNodes); + } + + @Override + public void unblockResources(Collection<BlockedNode> unBlockedNodes) { + allUnBlockedNodes.addAll(unBlockedNodes); + } + } +}
