Repository: flink Updated Branches: refs/heads/release-1.5 7763f7f68 -> c74f80869
Revert "[FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution" This reverts commit d9c669d4781f095806013651c1a579eae0ca2650. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c74f8086 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c74f8086 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c74f8086 Branch: refs/heads/release-1.5 Commit: c74f80869f3407ca8d02e79fbcf8a5b267ea7253 Parents: 7763f7f Author: Till Rohrmann <[email protected]> Authored: Fri Mar 16 11:05:57 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Mar 16 14:42:34 2018 +0100 ---------------------------------------------------------------------- .../instance/SlotSharingGroupAssignment.java | 46 +++++------- .../SlotSharingGroupAssignmentTest.java | 79 -------------------- 2 files changed, 20 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c74f8086/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index 289762c..e61ba58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -97,7 +97,7 @@ public class SlotSharingGroupAssignment { private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>(); /** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */ - private final Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>(); + private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>(); // -------------------------------------------------------------------------------------------- @@ -234,7 +234,7 @@ public class SlotSharingGroupAssignment { // can place a task into this slot. boolean entryForNewJidExists = false; - for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) { + for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) { // there is already an entry for this groupID if (entry.getKey().equals(groupIdForMap)) { entryForNewJidExists = true; @@ -247,7 +247,7 @@ public class SlotSharingGroupAssignment { // make sure an empty entry exists for this group, if no other entry exists if (!entryForNewJidExists) { - availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<>()); + availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>()); } return subSlot; @@ -393,7 +393,7 @@ public class SlotSharingGroupAssignment { } // get the available slots for the group - LinkedHashMap<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId); + Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId); if (slotsForGroup == null) { // we have a new group, so all slots are available @@ -624,26 +624,20 @@ public class SlotSharingGroupAssignment { private static SharedSlot pollFromMultiMap(Map<ResourceID, List<SharedSlot>> map) { Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = map.entrySet().iterator(); - + while (iter.hasNext()) { - Map.Entry<ResourceID, List<SharedSlot>> slotEntry = iter.next(); - - // remove first entry to add it at the back if there are still slots left - iter.remove(); - - List<SharedSlot> slots = slotEntry.getValue(); - - if (!slots.isEmpty()) { - - SharedSlot result = slots.remove(slots.size() - 1); - - if (!slots.isEmpty()) { - // reinserts the entry; since it is a LinkedHashMap, we will iterate over this entry - // only after having polled from all other entries - map.put(slotEntry.getKey(), slots); - } - - return result; + List<SharedSlot> slots = iter.next().getValue(); + + if (slots.isEmpty()) { + iter.remove(); + } + else if (slots.size() == 1) { + SharedSlot slot = slots.remove(0); + iter.remove(); + return slot; + } + else { + return slots.remove(slots.size() - 1); } } @@ -651,11 +645,11 @@ public class SlotSharingGroupAssignment { } private static void removeSlotFromAllEntries( - Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> availableSlots, - SharedSlot slot) { + Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlots, SharedSlot slot) + { final ResourceID taskManagerId = slot.getTaskManagerID(); - for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) { + for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) { Map<ResourceID, List<SharedSlot>> map = entry.getValue(); List<SharedSlot> list = map.get(taskManagerId); http://git-wip-us.apache.org/repos/asf/flink/blob/c74f8086/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java deleted file mode 100644 index 2407c1d..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.SlotOwner; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collections; - -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; - -public class SlotSharingGroupAssignmentTest extends TestLogger { - - /** - * Tests that slots are allocated in a round robin fashion from the set of available resources. - */ - @Test - public void testRoundRobinPolling() throws UnknownHostException { - final SlotSharingGroupAssignment slotSharingGroupAssignment = new SlotSharingGroupAssignment(); - final int numberTaskManagers = 2; - final int numberSlots = 2; - final JobVertexID sourceId = new JobVertexID(); - final JobVertexID sinkId = new JobVertexID(); - - for (int i = 0; i < numberTaskManagers; i++) { - final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000); - - for (int j = 0; j < numberSlots; j++) { - final SharedSlot slot = new SharedSlot( - mock(SlotOwner.class), - taskManagerLocation, - j, - mock(TaskManagerGateway.class), - slotSharingGroupAssignment); - - slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(slot, Locality.UNKNOWN, sourceId); - } - } - - SimpleSlot allocatedSlot1 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); - SimpleSlot allocatedSlot2 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); - - assertNotEquals(allocatedSlot1.getTaskManagerLocation(), allocatedSlot2.getTaskManagerLocation()); - - // let's check that we can still allocate all 4 slots - SimpleSlot allocatedSlot3 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); - assertNotNull(allocatedSlot3); - - SimpleSlot allocatedSlot4 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); - assertNotNull(allocatedSlot4); - } -}
