Repository: flink
Updated Branches:
  refs/heads/master cb60fd29e -> 91d346e9e


Revert "[FLINK-7851] [scheduling] Improve scheduling balance by round robin 
distribution"

This reverts commit d9c669d4781f095806013651c1a579eae0ca2650.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91d346e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91d346e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91d346e9

Branch: refs/heads/master
Commit: 91d346e9e7611be530509154cc7034cbde22653d
Parents: cb60fd2
Author: Till Rohrmann <[email protected]>
Authored: Fri Mar 16 11:04:44 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Mar 16 14:41:18 2018 +0100

----------------------------------------------------------------------
 .../instance/SlotSharingGroupAssignment.java    | 46 +++++-------
 .../SlotSharingGroupAssignmentTest.java         | 79 --------------------
 2 files changed, 20 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91d346e9/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 289762c..e61ba58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -97,7 +97,7 @@ public class SlotSharingGroupAssignment {
        private final Set<SharedSlot> allSlots = new 
LinkedHashSet<SharedSlot>();
 
        /** The slots available per vertex type (JobVertexId), keyed by 
TaskManager, to make them locatable */
-       private final Map<AbstractID, LinkedHashMap<ResourceID, 
List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
+       private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> 
availableSlotsPerJid = new LinkedHashMap<>();
 
 
        // 
--------------------------------------------------------------------------------------------
@@ -234,7 +234,7 @@ public class SlotSharingGroupAssignment {
                                // can place a task into this slot.
                                boolean entryForNewJidExists = false;
                                
-                               for (Map.Entry<AbstractID, 
LinkedHashMap<ResourceID, List<SharedSlot>>> entry : 
availableSlotsPerJid.entrySet()) {
+                               for (Map.Entry<AbstractID, Map<ResourceID, 
List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
                                        // there is already an entry for this 
groupID
                                        if 
(entry.getKey().equals(groupIdForMap)) {
                                                entryForNewJidExists = true;
@@ -247,7 +247,7 @@ public class SlotSharingGroupAssignment {
 
                                // make sure an empty entry exists for this 
group, if no other entry exists
                                if (!entryForNewJidExists) {
-                                       availableSlotsPerJid.put(groupIdForMap, 
new LinkedHashMap<>());
+                                       availableSlotsPerJid.put(groupIdForMap, 
new LinkedHashMap<ResourceID, List<SharedSlot>>());
                                }
 
                                return subSlot;
@@ -393,7 +393,7 @@ public class SlotSharingGroupAssignment {
                }
 
                // get the available slots for the group
-               LinkedHashMap<ResourceID, List<SharedSlot>> slotsForGroup = 
availableSlotsPerJid.get(groupId);
+               Map<ResourceID, List<SharedSlot>> slotsForGroup = 
availableSlotsPerJid.get(groupId);
                
                if (slotsForGroup == null) {
                        // we have a new group, so all slots are available
@@ -624,26 +624,20 @@ public class SlotSharingGroupAssignment {
        
        private static SharedSlot pollFromMultiMap(Map<ResourceID, 
List<SharedSlot>> map) {
                Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = 
map.entrySet().iterator();
-
+               
                while (iter.hasNext()) {
-                       Map.Entry<ResourceID, List<SharedSlot>> slotEntry = 
iter.next();
-
-                       // remove first entry to add it at the back if there 
are still slots left
-                       iter.remove();
-
-                       List<SharedSlot> slots = slotEntry.getValue();
-
-                       if (!slots.isEmpty()) {
-
-                               SharedSlot result = slots.remove(slots.size() - 
1);
-
-                               if (!slots.isEmpty()) {
-                                       // reinserts the entry; since it is a 
LinkedHashMap, we will iterate over this entry
-                                       // only after having polled from all 
other entries
-                                       map.put(slotEntry.getKey(), slots);
-                               }
-
-                               return result;
+                       List<SharedSlot> slots = iter.next().getValue();
+                       
+                       if (slots.isEmpty()) {
+                               iter.remove();
+                       }
+                       else if (slots.size() == 1) {
+                               SharedSlot slot = slots.remove(0);
+                               iter.remove();
+                               return slot;
+                       }
+                       else {
+                               return slots.remove(slots.size() - 1);
                        }
                }
                
@@ -651,11 +645,11 @@ public class SlotSharingGroupAssignment {
        }
        
        private static void removeSlotFromAllEntries(
-                       Map<AbstractID, LinkedHashMap<ResourceID, 
List<SharedSlot>>> availableSlots,
-                       SharedSlot slot) {
+                       Map<AbstractID, Map<ResourceID, List<SharedSlot>>> 
availableSlots, SharedSlot slot)
+       {
                final ResourceID taskManagerId = slot.getTaskManagerID();
                
-               for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, 
List<SharedSlot>>> entry : availableSlots.entrySet()) {
+               for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> 
entry : availableSlots.entrySet()) {
                        Map<ResourceID, List<SharedSlot>> map = 
entry.getValue();
 
                        List<SharedSlot> list = map.get(taskManagerId);

http://git-wip-us.apache.org/repos/asf/flink/blob/91d346e9/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
deleted file mode 100644
index 2407c1d..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
-
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
-public class SlotSharingGroupAssignmentTest extends TestLogger {
-
-       /**
-        * Tests that slots are allocated in a round robin fashion from the set 
of available resources.
-        */
-       @Test
-       public void testRoundRobinPolling() throws UnknownHostException {
-               final SlotSharingGroupAssignment slotSharingGroupAssignment = 
new SlotSharingGroupAssignment();
-               final int numberTaskManagers = 2;
-               final int numberSlots = 2;
-               final JobVertexID sourceId = new JobVertexID();
-               final JobVertexID sinkId = new JobVertexID();
-
-               for (int i = 0; i < numberTaskManagers; i++) {
-                       final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 
1000);
-
-                       for (int j = 0; j < numberSlots; j++) {
-                               final SharedSlot slot = new SharedSlot(
-                                       mock(SlotOwner.class),
-                                       taskManagerLocation,
-                                       j,
-                                       mock(TaskManagerGateway.class),
-                                       slotSharingGroupAssignment);
-
-                               
slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(slot, 
Locality.UNKNOWN, sourceId);
-                       }
-               }
-
-               SimpleSlot allocatedSlot1 = 
slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
-               SimpleSlot allocatedSlot2 = 
slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
-
-               assertNotEquals(allocatedSlot1.getTaskManagerLocation(), 
allocatedSlot2.getTaskManagerLocation());
-
-               // let's check that we can still allocate all 4 slots
-               SimpleSlot allocatedSlot3 = 
slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
-               assertNotNull(allocatedSlot3);
-
-               SimpleSlot allocatedSlot4 = 
slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
-               assertNotNull(allocatedSlot4);
-       }
-}

Reply via email to