This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new ed38718  [FLINK-13421] Exclude releasing root slots from slot 
allocation
ed38718 is described below

commit ed38718a9b6b1fd55cf2107821813272083a4b63
Author: zhuzhu.zz <[email protected]>
AuthorDate: Wed Jul 31 15:01:33 2019 +0800

    [FLINK-13421] Exclude releasing root slots from slot allocation
    
    Make MultiTaskSlot not available for allocation when it’s releasing children
    to avoid ConcurrentModificationException.
    
    This closes #9288.
---
 .../jobmaster/slotpool/SlotSharingManager.java     | 22 +++++----
 .../executiongraph/ExecutionGraphRestartTest.java  | 40 ++++++++++++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 56 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index b2aeed3..f7dd18b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -51,6 +51,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -185,7 +186,7 @@ public class SlotSharingManager {
                        .values()
                        .stream()
                                .flatMap((Map<AllocationID, MultiTaskSlot> map) 
-> map.values().stream())
-                               .filter((MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId))
+                               
.filter(validMultiTaskSlotAndDoesNotContain(groupId))
                                .map((MultiTaskSlot multiTaskSlot) -> {
                                        SlotInfo slotInfo = 
multiTaskSlot.getSlotContextFuture().join();
                                        return new 
SlotSelectionStrategy.SlotInfoAndResources(
@@ -194,6 +195,10 @@ public class SlotSharingManager {
                                }).collect(Collectors.toList());
        }
 
+       private Predicate<MultiTaskSlot> 
validMultiTaskSlotAndDoesNotContain(@Nullable AbstractID groupId) {
+               return (MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId) && !multiTaskSlot.isReleasing();
+       }
+
        @Nullable
        public MultiTaskSlot getResolvedRootSlot(@Nonnull SlotInfo slotInfo) {
                Map<AllocationID, MultiTaskSlot> forLocationEntry = 
resolvedRootSlots.get(slotInfo.getTaskManagerLocation());
@@ -209,13 +214,10 @@ public class SlotSharingManager {
         */
        @Nullable
        MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
-               for (MultiTaskSlot multiTaskSlot : 
unresolvedRootSlots.values()) {
-                       if (!multiTaskSlot.contains(groupId)) {
-                               return multiTaskSlot;
-                       }
-               }
-
-               return null;
+               return unresolvedRootSlots.values().stream()
+                       .filter(validMultiTaskSlotAndDoesNotContain(groupId))
+                       .findFirst()
+                       .orElse(null);
        }
 
        @Override
@@ -627,6 +629,10 @@ public class SlotSharingManager {
                        }
                }
 
+               boolean isReleasing() {
+                       return releasingChildren;
+               }
+
                @Override
                public String toString() {
                        String physicalSlotDescription;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index bf312a7..a885398 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -553,6 +554,45 @@ public class ExecutionGraphRestartTest extends TestLogger {
                }
        }
 
+       /**
+        * SlotPool#failAllocation should not fail with a {@link 
java.util.ConcurrentModificationException}
+        * if there is a concurrent scheduling operation. See FLINK-13421.
+        */
+       @Test
+       public void 
slotPoolExecutionGraph_ConcurrentSchedulingAndAllocationFailure_ShouldNotFailWithConcurrentModificationException()
 throws Exception {
+               final SlotSharingGroup group = new SlotSharingGroup();
+               final JobVertex vertex1 = createNoOpVertex("vertex1", 1);
+               vertex1.setSlotSharingGroup(group);
+               final JobVertex vertex2 = createNoOpVertex("vertex2", 3);
+               vertex2.setSlotSharingGroup(group);
+               final JobVertex vertex3 = createNoOpVertex("vertex3", 1);
+               vertex3.setSlotSharingGroup(group);
+               vertex3.connectNewDataSetAsInput(vertex2, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+               try (SlotPool slotPool = createSlotPoolImpl()) {
+                       final SlotProvider slots = createSchedulerWithSlots(2, 
slotPool, new LocalTaskManagerLocation());
+
+                       final AllocationID allocationId = 
slotPool.getAvailableSlotsInformation().iterator().next().getAllocationId();
+
+                       final ExecutionGraph eg = new 
ExecutionGraphTestUtils.TestingExecutionGraphBuilder(TEST_JOB_ID, vertex1, 
vertex2, vertex3)
+                               .setSlotProvider(slots)
+                               .setAllocationTimeout(Time.minutes(60))
+                               .setScheduleMode(ScheduleMode.EAGER)
+                               .setAllowQueuedScheduling(true)
+                               .build();
+
+                       eg.start(mainThreadExecutor);
+
+                       eg.scheduleForExecution();
+
+                       slotPool.failAllocation(
+                               allocationId,
+                               new Exception("test exception"));
+
+                       eg.waitUntilTerminal();
+               }
+       }
+
        @Test
        public void testRestartWithEagerSchedulingAndSlotSharing() throws 
Exception {
                // this test is inconclusive if not used with a proper 
multi-threaded executor
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index ecf2e4e..04ef8b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -524,6 +525,61 @@ public class SlotSharingManagerTest extends TestLogger {
                assertEquals(rootSlot1.getSlotRequestId(), 
resolvedRootSlot.getSlotRequestId());
        }
 
+       /**
+        * Tests that we cannot retrieve a slot when it's releasing children.
+        */
+       @Test
+       public void testResolvedSlotInReleasingIsNotAvailable() throws 
Exception {
+               final TestingAllocatedSlotActions allocatedSlotActions = new 
TestingAllocatedSlotActions();
+
+               final SlotSharingManager slotSharingManager = new 
SlotSharingManager(
+                       SLOT_SHARING_GROUP_ID,
+                       allocatedSlotActions,
+                       SLOT_OWNER);
+
+               final SlotSharingManager.MultiTaskSlot rootSlot = 
slotSharingManager.createRootSlot(
+                       new SlotRequestId(),
+                       CompletableFuture.completedFuture(
+                               new SimpleSlotContext(
+                                       new AllocationID(),
+                                       new LocalTaskManagerLocation(),
+                                       0,
+                                       new SimpleAckingTaskManagerGateway())),
+                       new SlotRequestId());
+
+               final AbstractID groupId1 = new AbstractID();
+               final SlotSharingManager.SingleTaskSlot singleTaskSlot = 
rootSlot.allocateSingleTaskSlot(
+                       new SlotRequestId(),
+                       ResourceProfile.UNKNOWN,
+                       groupId1,
+                       Locality.UNCONSTRAINED);
+
+               final AtomicBoolean verified = new AtomicBoolean(false);
+
+               final AbstractID groupId2 = new AbstractID();
+               // register a verification in MultiTaskSlot's children 
releasing loop
+               
singleTaskSlot.getLogicalSlotFuture().get().tryAssignPayload(new 
LogicalSlot.Payload() {
+                       @Override
+                       public void fail(Throwable cause) {
+                               assertEquals(0, 
slotSharingManager.listResolvedRootSlotInfo(groupId2).size());
+
+                               verified.set(true);
+                       }
+
+                       @Override
+                       public CompletableFuture<?> getTerminalStateFuture() {
+                               return null;
+                       }
+               });
+
+               assertEquals(1, 
slotSharingManager.listResolvedRootSlotInfo(groupId2).size());
+
+               rootSlot.release(new Exception("test exception"));
+
+               // ensure the verification in Payload#fail is passed
+               assertTrue(verified.get());
+       }
+
        @Test
        public void testGetUnresolvedSlot() {
                final TestingAllocatedSlotActions allocatedSlotActions = new 
TestingAllocatedSlotActions();

Reply via email to