This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72ea8b5999bf36125aa5f1a38df4ec52c7a95702 Author: Lijie Wang <[email protected]> AuthorDate: Mon Jul 4 15:04:02 2022 +0800 [FLINK-28144][runtime] Introduce blocklist handler factory and a no-op blocklist handler. --- .../flink/runtime/blocklist/BlocklistHandler.java | 23 ++++++++ .../flink/runtime/blocklist/BlocklistUtils.java | 45 +++++++++++++++ .../runtime/blocklist/DefaultBlocklistHandler.java | 35 ++++++++++-- .../runtime/blocklist/NoOpBlocklistHandler.java | 64 ++++++++++++++++++++++ .../blocklist/DefaultBlocklistHandlerTest.java | 8 +-- 5 files changed, 166 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java index 9247595125f..f07c2ae859f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java @@ -19,9 +19,13 @@ package org.apache.flink.runtime.blocklist; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; + +import org.slf4j.Logger; import java.util.Collection; import java.util.Set; +import java.util.function.Function; /** * This class is responsible for managing all {@link BlockedNode}s and performing them on resources. @@ -64,4 +68,23 @@ public interface BlocklistHandler { * @param blocklistListener the listener to deregister */ void deregisterBlocklistListener(BlocklistListener blocklistListener); + + /** Factory to instantiate {@link BlocklistHandler}. */ + interface Factory { + + /** + * Instantiates a {@link BlocklistHandler}. + * + * @param blocklistContext the blocklist context + * @param taskManagerNodeIdRetriever to map a task manager to the node it's located on + * @param mainThreadExecutor to schedule the timeout check + * @param log the logger + * @return an instantiated blocklist handler. + */ + BlocklistHandler create( + BlocklistContext blocklistContext, + Function<ResourceID, String> taskManagerNodeIdRetriever, + ComponentMainThreadExecutor mainThreadExecutor, + Logger log); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistUtils.java new file mode 100644 index 00000000000..f4b61ee62a3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.SlowTaskDetectorOptions; + +/** Utility class for blocklist. */ +public class BlocklistUtils { + + public static BlocklistHandler.Factory loadBlocklistHandlerFactory( + Configuration configuration) { + if (isBlocklistEnabled(configuration)) { + return new DefaultBlocklistHandler.Factory( + configuration.get(SlowTaskDetectorOptions.CHECK_INTERVAL)); + } else { + return new NoOpBlocklistHandler.Factory(); + } + } + + public static boolean isBlocklistEnabled(Configuration configuration) { + // Currently, only enable blocklist for speculative execution + return configuration.getBoolean(JobManagerOptions.SPECULATIVE_ENABLED); + } + + /** Private default constructor to avoid being instantiated. */ + private BlocklistUtils() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java index 46fb6a54247..a88ab431727 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.blocklist; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.slf4j.Logger; +import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -46,17 +46,17 @@ public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable private final Set<BlocklistListener> blocklistListeners = new HashSet<>(); - private final Time timeoutCheckInterval; + private final Duration timeoutCheckInterval; private volatile ScheduledFuture<?> timeoutCheckFuture; private final ComponentMainThreadExecutor mainThreadExecutor; - public DefaultBlocklistHandler( + DefaultBlocklistHandler( BlocklistTracker blocklistTracker, BlocklistContext blocklistContext, Function<ResourceID, String> taskManagerNodeIdRetriever, - Time timeoutCheckInterval, + Duration timeoutCheckInterval, ComponentMainThreadExecutor mainThreadExecutor, Logger log) { this.blocklistTracker = checkNotNull(blocklistTracker); @@ -76,7 +76,7 @@ public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable removeTimeoutNodes(); scheduleTimeoutCheck(); }, - timeoutCheckInterval.toMilliseconds(), + timeoutCheckInterval.toMillis(), TimeUnit.MILLISECONDS); } @@ -176,4 +176,29 @@ public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable timeoutCheckFuture.cancel(false); } } + + /** The factory to instantiate {@link DefaultBlocklistHandler}. */ + public static class Factory implements BlocklistHandler.Factory { + + private final Duration timeoutCheckInterval; + + public Factory(Duration timeoutCheckInterval) { + this.timeoutCheckInterval = checkNotNull(timeoutCheckInterval); + } + + @Override + public BlocklistHandler create( + BlocklistContext blocklistContext, + Function<ResourceID, String> taskManagerNodeIdRetriever, + ComponentMainThreadExecutor mainThreadExecutor, + Logger log) { + return new DefaultBlocklistHandler( + new DefaultBlocklistTracker(), + blocklistContext, + taskManagerNodeIdRetriever, + timeoutCheckInterval, + mainThreadExecutor, + log); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/NoOpBlocklistHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/NoOpBlocklistHandler.java new file mode 100644 index 00000000000..d75725512dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/NoOpBlocklistHandler.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +/** No-op implementation of {@link BlocklistHandler}. */ +public class NoOpBlocklistHandler implements BlocklistHandler { + @Override + public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {} + + @Override + public boolean isBlockedTaskManager(ResourceID taskManagerId) { + return false; + } + + @Override + public Set<String> getAllBlockedNodeIds() { + return Collections.emptySet(); + } + + @Override + public void registerBlocklistListener(BlocklistListener blocklistListener) {} + + @Override + public void deregisterBlocklistListener(BlocklistListener blocklistListener) {} + + /** The factory to instantiate {@link NoOpBlocklistHandler}. */ + public static class Factory implements BlocklistHandler.Factory { + + @Override + public BlocklistHandler create( + BlocklistContext blocklistContext, + Function<ResourceID, String> taskManagerNodeIdRetriever, + ComponentMainThreadExecutor mainThreadExecutor, + Logger log) { + return new NoOpBlocklistHandler(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java index a10878047ab..f6092d416d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.blocklist; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; @@ -30,6 +29,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -157,7 +157,7 @@ class DefaultBlocklistHandlerTest { new DefaultBlocklistTracker(), blocklistContext, resourceID -> "node", - Time.milliseconds(100L), + Duration.ofMillis(100L), ComponentMainThreadExecutorServiceAdapter.forMainThread(), LOG); } @@ -168,7 +168,7 @@ class DefaultBlocklistHandlerTest { new DefaultBlocklistTracker(), TestBlocklistContext.newBuilder().build(), taskManagerToNode::get, - Time.milliseconds(100L), + Duration.ofMillis(100L), ComponentMainThreadExecutorServiceAdapter.forMainThread(), LOG); } @@ -179,7 +179,7 @@ class DefaultBlocklistHandlerTest { new DefaultBlocklistTracker(), blocklistContext, resourceID -> "node", - Time.milliseconds(100L), + Duration.ofMillis(100L), mainThreadExecutor, LOG); }
