zhuzhurk commented on code in PR #20153: URL: https://github.com/apache/flink/pull/20153#discussion_r917219894
########## flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistUtils.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.SlowTaskDetectorOptions; + +/** Utility class for blocklist. */ +public class BlocklistUtils { + + public static BlocklistHandler.Factory loadBlocklistHandlerFactory( + Configuration configuration) { + if (isBlocklistEnabled(configuration)) { + return new DefaultBlocklistHandler.Factory( + Time.fromDuration(configuration.get(SlowTaskDetectorOptions.CHECK_INTERVAL))); Review Comment: Let's use `Duration` instead of `Time` in `DefaultBlocklistHandler`. The usage of `Time` is a legacy problem and should be avoided if possible. ########## flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java: ########## @@ -64,4 +68,23 @@ public interface BlocklistHandler { * @param blocklistListener the listener to deregister */ void deregisterBlocklistListener(BlocklistListener blocklistListener); + + /** Factory to instantiate {@link BlocklistHandler}. */ + interface Factory { + + /** + * Instantiates an {@link BlocklistHandler}. Review Comment: an -> a ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; +import org.apache.flink.runtime.slots.ResourceRequirement; + +import java.util.Collection; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class BlocklistDeclarativeSlotPoolFactory implements DeclarativeSlotPoolFactory { Review Comment: A java doc is required. ########## flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedTaskManagerChecker.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +/** This checker helps to query whether a given task manager is blocked. */ +public interface BlockedTaskManagerChecker { + + /** + * Returns whether the given task manager is located on blocked nodes. Review Comment: blocked nodes -> a blocked node ########## flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistHandler.java: ########## @@ -64,4 +68,23 @@ public interface BlocklistHandler { * @param blocklistListener the listener to deregister */ void deregisterBlocklistListener(BlocklistListener blocklistListener); + + /** Factory to instantiate {@link BlocklistHandler}. */ Review Comment: This change should not be a hotfix. If it is needed, let's just mark it as part of FLINK-28144. ########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.ResourceCounter; + +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.FreeSlotConsumer; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.NewSlotsService; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createResourceRequirements; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.drainNewSlotService; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool; +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.HamcrestCondition.matching; +import static org.hamcrest.Matchers.containsInAnyOrder; + +/** Test for {@link BlocklistDeclarativeSlotPool}. */ +class BlocklistDeclarativeSlotPoolTest { + + private static final ResourceProfile RESOURCE_PROFILE = + ResourceProfile.newBuilder().setCpuCores(1.7).build(); + + @Test + void testOfferSlotsFromBlockedTaskManager() throws Exception { + testOfferSlots(true); + } + + @Test + void testOfferSlotsFromUnblockedTaskManager() throws Exception { + testOfferSlots(false); + } + + private void testOfferSlots(boolean isBlocked) throws Exception { + final TaskManagerLocation taskManager = new LocalTaskManagerLocation(); + + final NewSlotsService notifyNewSlots = new NewSlotsService(); + // mark task manager as blocked. + final BlocklistDeclarativeSlotPool slotPool = + BlocklistDeclarativeSlotPoolBuilder.builder() + .setBlockedTaskManagerChecker( + isBlocked ? taskManager.getResourceID()::equals : ignore -> false) + .build(); + slotPool.registerNewSlotsListener(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + // offer slots on the blocked task manager + Collection<SlotOffer> slotOffers = + createSlotOffersForResourceRequirements(resourceRequirements); + + if (isBlocked) { + assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, taskManager)).isEmpty(); + assertThat(drainNewSlotService(notifyNewSlots)).isEmpty(); + } else { + assertThat(SlotPoolTestUtils.offerSlots(slotPool, slotOffers, taskManager)) + .containsExactlyInAnyOrderElementsOf(slotOffers); + assertThat(drainNewSlotService(notifyNewSlots)) + .is( + matching( Review Comment: I would avoid use Hamcrest in new tests. One possible way to do the verification is to build a `Map<AllocationID, SlotOffer> slotOfferMap` from `slotOffers`, and introduce a method `boolean matchSlotToOffers(PhysicalSlot, SlotOffer)` and the assertion can be: ``` assertThat(drainNewSlotService(notifyNewSlots)).allMatch(slot -> matchSlotToOffers(slot, slotOfferMap.remove(slot.getAllocationId())); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
