This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2216ec5  YARN-9100. Add tests for GpuResourceAllocator and do minor 
code cleanup. Contributed by Peter Bacsko
2216ec5 is described below

commit 2216ec54e58e24ff09620fc2efa2f1733391d0c3
Author: Szilard Nemeth <snem...@apache.org>
AuthorDate: Fri Aug 16 09:13:20 2019 +0200

    YARN-9100. Add tests for GpuResourceAllocator and do minor code cleanup. 
Contributed by Peter Bacsko
---
 .../linux/resources/gpu/GpuResourceAllocator.java  | 106 ++---
 .../resources/gpu/GpuResourceHandlerImpl.java      |   2 +-
 .../resourceplugin/gpu/GpuResourcePlugin.java      |   4 +-
 .../resources/gpu/TestGpuResourceAllocator.java    | 442 +++++++++++++++++++++
 .../resources/gpu/TestGpuResourceHandlerImpl.java  |   8 +-
 5 files changed, 509 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java
index 0b95ca7..2300776 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceAllocator.java
@@ -19,6 +19,8 @@
 package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
@@ -38,34 +40,44 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
 
 /**
- * Allocate GPU resources according to requirements
+ * Allocate GPU resources according to requirements.
  */
 public class GpuResourceAllocator {
   final static Logger LOG = LoggerFactory.
       getLogger(GpuResourceAllocator.class);
+
   private static final int WAIT_MS_PER_LOOP = 1000;
 
   private Set<GpuDevice> allowedGpuDevices = new TreeSet<>();
   private Map<GpuDevice, ContainerId> usedDevices = new TreeMap<>();
   private Context nmContext;
+  private final int waitPeriodForResource;
 
   public GpuResourceAllocator(Context ctx) {
     this.nmContext = ctx;
+    // Wait for a maximum of 120 seconds if no available GPU are there which
+    // are yet to be released.
+    this.waitPeriodForResource = 120 * WAIT_MS_PER_LOOP;
+  }
+
+  @VisibleForTesting
+  GpuResourceAllocator(Context ctx, int waitPeriodForResource) {
+    this.nmContext = ctx;
+    this.waitPeriodForResource = waitPeriodForResource;
   }
 
   /**
-   * Contains allowed and denied devices
+   * Contains allowed and denied devices.
    * Denied devices will be useful for cgroups devices module to do 
blacklisting
    */
   static class GpuAllocation {
@@ -91,20 +103,13 @@ public class GpuResourceAllocator {
   }
 
   /**
-   * Add GPU to allowed list
+   * Add GPU to the allowed list of GPUs.
    * @param gpuDevice gpu device
    */
   public synchronized void addGpu(GpuDevice gpuDevice) {
     allowedGpuDevices.add(gpuDevice);
   }
 
-  private String getResourceHandlerExceptionMessage(int numRequestedGpuDevices,
-      ContainerId containerId) {
-    return "Failed to find enough GPUs, requestor=" + containerId
-        + ", #RequestedGPUs=" + numRequestedGpuDevices + ", #availableGpus="
-        + getAvailableGpus();
-  }
-
   @VisibleForTesting
   public synchronized int getAvailableGpus() {
     return allowedGpuDevices.size() - usedDevices.size();
@@ -113,10 +118,10 @@ public class GpuResourceAllocator {
   public synchronized void recoverAssignedGpus(ContainerId containerId)
       throws ResourceHandlerException {
     Container c = nmContext.getContainers().get(containerId);
-    if (null == c) {
+    if (c == null) {
       throw new ResourceHandlerException(
-          "This shouldn't happen, cannot find container with id="
-              + containerId);
+          "Cannot find container with id=" + containerId +
+              ", this should not occur under normal circumstances!");
     }
 
     LOG.info("Starting recovery of GpuDevice for {}.", containerId);
@@ -125,7 +130,8 @@ public class GpuResourceAllocator {
       if (!(gpuDeviceSerializable instanceof GpuDevice)) {
         throw new ResourceHandlerException(
             "Trying to recover device id, however it"
-                + " is not GpuDevice, this shouldn't happen");
+                + " is not an instance of " + GpuDevice.class.getName()
+                + ", this should not occur under normal circumstances!");
       }
 
       GpuDevice gpuDevice = (GpuDevice) gpuDeviceSerializable;
@@ -134,8 +140,8 @@ public class GpuResourceAllocator {
       if (!allowedGpuDevices.contains(gpuDevice)) {
         throw new ResourceHandlerException(
             "Try to recover device = " + gpuDevice
-                + " however it is not in allowed device list:" + StringUtils
-                .join(",", allowedGpuDevices));
+                + " however it is not in the allowed device list:" +
+                StringUtils.join(",", allowedGpuDevices));
       }
 
       // Make sure it is not occupied by anybody else
@@ -168,7 +174,7 @@ public class GpuResourceAllocator {
   }
 
   /**
-   * Assign GPU to requestor
+   * Assign GPU to the specified container.
    * @param container container to allocate
    * @return allocation results.
    * @throws ResourceHandlerException When failed to assign GPUs.
@@ -177,12 +183,11 @@ public class GpuResourceAllocator {
       throws ResourceHandlerException {
     GpuAllocation allocation = internalAssignGpus(container);
 
-    // Wait for a maximum of 120 seconds if no available GPU are there which
-    // are yet to be released.
-    final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP;
+    // Wait for a maximum of waitPeriodForResource seconds if no
+    // available GPU are there which are yet to be released.
     int timeWaiting = 0;
     while (allocation == null) {
-      if (timeWaiting >= timeoutMsecs) {
+      if (timeWaiting >= waitPeriodForResource) {
         break;
       }
 
@@ -196,6 +201,8 @@ public class GpuResourceAllocator {
         allocation = internalAssignGpus(container);
       } catch (InterruptedException e) {
         // On any interrupt, break the loop and continue execution.
+        Thread.currentThread().interrupt();
+        LOG.warn("Interrupted while waiting for available GPU");
         break;
       }
     }
@@ -215,8 +222,15 @@ public class GpuResourceAllocator {
     Resource requestedResource = container.getResource();
     ContainerId containerId = container.getContainerId();
     int numRequestedGpuDevices = getRequestedGpus(requestedResource);
-    // Assign Gpus to container if requested some.
+
+    // Assign GPUs to container if requested some.
     if (numRequestedGpuDevices > 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Trying to assign %d GPUs to container: %s" +
+            ", #AvailableGPUs=%d, #ReleasingGPUs=%d",
+            numRequestedGpuDevices, containerId,
+            getAvailableGpus(), getReleasingGpus()));
+      }
       if (numRequestedGpuDevices > getAvailableGpus()) {
         // If there are some devices which are getting released, wait for few
         // seconds to get it.
@@ -227,8 +241,9 @@ public class GpuResourceAllocator {
 
       if (numRequestedGpuDevices > getAvailableGpus()) {
         throw new ResourceHandlerException(
-            getResourceHandlerExceptionMessage(numRequestedGpuDevices,
-                containerId));
+            "Failed to find enough GPUs, requestor=" + containerId +
+                ", #RequestedGPUs=" + numRequestedGpuDevices +
+                ", #AvailableGPUs=" + getAvailableGpus());
       }
 
       Set<GpuDevice> assignedGpus = new TreeSet<>();
@@ -250,7 +265,7 @@ public class GpuResourceAllocator {
           nmContext.getNMStateStore().storeAssignedResources(container, 
GPU_URI,
               new ArrayList<>(assignedGpus));
         } catch (IOException e) {
-          cleanupAssignGpus(containerId);
+          unassignGpus(containerId);
           throw new ResourceHandlerException(e);
         }
       }
@@ -276,35 +291,34 @@ public class GpuResourceAllocator {
   }
 
   /**
-   * Clean up all Gpus assigned to containerId
+   * Clean up all GPUs assigned to containerId.
    * @param containerId containerId
    */
-  public synchronized void cleanupAssignGpus(ContainerId containerId) {
-    Iterator<Map.Entry<GpuDevice, ContainerId>> iter =
-        usedDevices.entrySet().iterator();
-    while (iter.hasNext()) {
-      if (iter.next().getValue().equals(containerId)) {
-        iter.remove();
-      }
+  public synchronized void unassignGpus(ContainerId containerId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to unassign GPU device from container " + containerId);
     }
+    usedDevices.entrySet().removeIf(entry ->
+        entry.getValue().equals(containerId));
   }
 
   @VisibleForTesting
-  public synchronized Map<GpuDevice, ContainerId> 
getDeviceAllocationMappingCopy() {
-    return new HashMap<>(usedDevices);
+  public synchronized Map<GpuDevice, ContainerId> getDeviceAllocationMapping() 
{
+    return ImmutableMap.copyOf(usedDevices);
   }
 
-  public synchronized List<GpuDevice> getAllowedGpusCopy() {
-    return new ArrayList<>(allowedGpuDevices);
+  public synchronized List<GpuDevice> getAllowedGpus() {
+    return ImmutableList.copyOf(allowedGpuDevices);
   }
 
-  public synchronized List<AssignedGpuDevice> getAssignedGpusCopy() {
-    List<AssignedGpuDevice> assigns = new ArrayList<>();
-    for (Map.Entry<GpuDevice, ContainerId> entry : usedDevices.entrySet()) {
-      assigns.add(new AssignedGpuDevice(entry.getKey().getIndex(),
-          entry.getKey().getMinorNumber(), entry.getValue()));
-    }
-    return assigns;
+  public synchronized List<AssignedGpuDevice> getAssignedGpus() {
+    return usedDevices.entrySet().stream()
+        .map(e -> {
+          final GpuDevice gpu = e.getKey();
+          ContainerId containerId = e.getValue();
+          return new AssignedGpuDevice(gpu.getIndex(), gpu.getMinorNumber(),
+              containerId);
+        }).collect(Collectors.toList());
   }
 
   @Override
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java
index aa52dd3..71b041b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java
@@ -177,7 +177,7 @@ public class GpuResourceHandlerImpl implements 
ResourceHandler {
   @Override
   public synchronized List<PrivilegedOperation> postComplete(
       ContainerId containerId) throws ResourceHandlerException {
-    gpuAllocator.cleanupAssignGpus(containerId);
+    gpuAllocator.unassignGpus(containerId);
     cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
         containerId.toString());
     return null;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java
index 7719afb..2b06f31 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuResourcePlugin.java
@@ -97,9 +97,9 @@ public class GpuResourcePlugin implements ResourcePlugin {
 
     GpuResourceAllocator gpuResourceAllocator =
         gpuResourceHandler.getGpuAllocator();
-    List<GpuDevice> totalGpus = gpuResourceAllocator.getAllowedGpusCopy();
+    List<GpuDevice> totalGpus = gpuResourceAllocator.getAllowedGpus();
     List<AssignedGpuDevice> assignedGpuDevices =
-        gpuResourceAllocator.getAssignedGpusCopy();
+        gpuResourceAllocator.getAssignedGpus();
 
     return new NMGpuResourceInfo(gpuDeviceInformation, totalGpus,
         assignedGpuDevices);
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceAllocator.java
new file mode 100644
index 0000000..a661de7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceAllocator.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
+import static 
org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider.initResourceTypes;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu.GpuResourceAllocator.GpuAllocation;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Unit tests for GpuResourceAllocator.
+ */
+public class TestGpuResourceAllocator {
+  private static final int WAIT_PERIOD_FOR_RESOURCE = 100;
+
+  private static class ContainerMatcher implements ArgumentMatcher<Container> {
+
+    private Container container;
+
+    ContainerMatcher(Container container) {
+      this.container = container;
+    }
+
+    @Override
+    public boolean matches(Container other) {
+      long expectedId = container.getContainerId().getContainerId();
+      long otherId = other.getContainerId().getContainerId();
+      return expectedId == otherId;
+    }
+  }
+
+  @Captor
+  private ArgumentCaptor<List<Serializable>> gpuCaptor;
+
+  @Mock
+  private NMContext nmContext;
+
+  @Mock
+  private NMStateStoreService nmStateStore;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private GpuResourceAllocator testSubject;
+
+  @Before
+  public void setup() {
+    initResourceTypes(ResourceInformation.GPU_URI);
+    MockitoAnnotations.initMocks(this);
+    testSubject = createTestSubject(WAIT_PERIOD_FOR_RESOURCE);
+  }
+
+  private GpuResourceAllocator createTestSubject(int waitPeriodForResource) {
+    when(nmContext.getNMStateStore()).thenReturn(nmStateStore);
+    when(nmContext.getContainers()).thenReturn(new ConcurrentHashMap<>());
+    return new GpuResourceAllocator(nmContext, waitPeriodForResource);
+  }
+
+  private Resource createGpuResourceRequest(int gpus) {
+    Resource res = Resource.newInstance(1024, 1);
+
+    if (gpus > 0) {
+      res.setResourceValue(ResourceInformation.GPU_URI, gpus);
+    }
+    return res;
+  }
+
+  private List<Container> createMockContainers(int gpus,
+      int numberOfContainers) {
+    final long id = 111L;
+
+    List<Container> containers = Lists.newArrayList();
+    for (int i = 0; i < numberOfContainers; i++) {
+      containers.add(createMockContainer(gpus, id + i));
+    }
+    return containers;
+  }
+
+  private Container createMockContainer(int gpus, long id) {
+    Resource res = createGpuResourceRequest(gpus);
+    ContainerId containerId = mock(ContainerId.class);
+    when(containerId.getContainerId()).thenReturn(id);
+
+    Container container = mock(Container.class);
+    when(container.getResource()).thenReturn(res);
+    when(container.getContainerId()).thenReturn(containerId);
+    when(container.getContainerState()).thenReturn(ContainerState.RUNNING);
+    nmContext.getContainers().put(containerId, container);
+
+    return container;
+  }
+
+  private void createAndAddGpus(int numberOfGpus) {
+    for (int i = 0; i < numberOfGpus; i++) {
+      testSubject.addGpu(new GpuDevice(1, i));
+    }
+
+    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(0, testSubject.getAssignedGpus().size());
+    assertEquals(numberOfGpus, testSubject.getAllowedGpus().size());
+    assertEquals(numberOfGpus, testSubject.getAvailableGpus());
+  }
+
+  private void addGpus(GpuDevice... gpus) {
+    for (GpuDevice gpu : gpus) {
+      testSubject.addGpu(gpu);
+    }
+    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(0, testSubject.getAssignedGpus().size());
+    assertEquals(gpus.length, testSubject.getAllowedGpus().size());
+    assertEquals(gpus.length, testSubject.getAvailableGpus());
+  }
+
+  private void addGpusAndDontVerify(GpuDevice... gpus) {
+    for (GpuDevice gpu : gpus) {
+      testSubject.addGpu(gpu);
+    }
+  }
+
+  private void setupContainerAsReleasingGpus(Container... releasingContainers) 
{
+    ContainerState[] finalStates = new ContainerState[] {
+        ContainerState.KILLING, ContainerState.DONE,
+        ContainerState.LOCALIZATION_FAILED,
+        ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
+        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerState.EXITED_WITH_SUCCESS
+    };
+
+    final Random random = new Random();
+    for (Container container : releasingContainers) {
+      ContainerState state = finalStates[random.nextInt(finalStates.length)];
+      when(container.getContainerState()).thenReturn(state);
+      when(container.isContainerInFinalStates()).thenReturn(true);
+    }
+  }
+
+  private void assertAllocatedGpu(GpuDevice expectedGpu, Container container,
+      GpuAllocation allocation) throws IOException {
+    assertEquals(1, allocation.getAllowedGPUs().size());
+    assertEquals(0, allocation.getDeniedGPUs().size());
+
+    Set<GpuDevice> allowedGPUs = allocation.getAllowedGPUs();
+
+    GpuDevice allocatedGpu = allowedGPUs.iterator().next();
+    assertEquals(expectedGpu, allocatedGpu);
+    assertAssignmentInStateStore(expectedGpu, container);
+  }
+
+  private void assertAllocatedGpus(int gpus, int deniedGpus,
+      Container container,
+      GpuAllocation allocation) throws IOException {
+    assertEquals(gpus, allocation.getAllowedGPUs().size());
+    assertEquals(deniedGpus, allocation.getDeniedGPUs().size());
+    assertAssignmentInStateStore(gpus, container);
+  }
+
+  private void assertNoAllocation(GpuAllocation allocation) {
+    assertEquals(1, allocation.getDeniedGPUs().size());
+    assertEquals(0, allocation.getAllowedGPUs().size());
+    verifyZeroInteractions(nmStateStore);
+  }
+
+  private void assertAssignmentInStateStore(GpuDevice expectedGpu,
+      Container container) throws IOException {
+    verify(nmStateStore).storeAssignedResources(
+        argThat(new ContainerMatcher(container)), eq(GPU_URI),
+        gpuCaptor.capture());
+
+    List<Serializable> gpuList = gpuCaptor.getValue();
+    assertEquals(1, gpuList.size());
+    assertEquals(expectedGpu, gpuList.get(0));
+  }
+
+  private void assertAssignmentInStateStore(int gpus,
+      Container container) throws IOException {
+    verify(nmStateStore).storeAssignedResources(
+        argThat(new ContainerMatcher(container)), eq(GPU_URI),
+        gpuCaptor.capture());
+
+    List<Serializable> gpuList = gpuCaptor.getValue();
+    assertEquals(gpus, gpuList.size());
+  }
+
+  private static Set<GpuAllocation> findDuplicates(
+      List<GpuAllocation> allocations) {
+    final Set<GpuAllocation> result = new HashSet<>();
+    final Set<GpuAllocation> tmpSet = new HashSet<>();
+
+    for (GpuAllocation allocation : allocations) {
+      if (!tmpSet.add(allocation)) {
+        result.add(allocation);
+      }
+    }
+    return result;
+  }
+
+  @Test
+  public void testNewGpuAllocatorHasEmptyCollectionOfDevices() {
+    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(0, testSubject.getAssignedGpus().size());
+    assertEquals(0, testSubject.getAllowedGpus().size());
+    assertEquals(0, testSubject.getAvailableGpus());
+  }
+
+  @Test
+  public void testAddOneDevice() {
+    addGpus(new GpuDevice(1, 1));
+    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(0, testSubject.getAssignedGpus().size());
+  }
+
+  @Test
+  public void testAddMoreDevices() {
+    addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
+    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(0, testSubject.getAssignedGpus().size());
+  }
+
+  @Test
+  public void testAddMoreDevicesWithSameData() {
+    addGpusAndDontVerify(new GpuDevice(1, 1), new GpuDevice(1, 1));
+    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(0, testSubject.getAssignedGpus().size());
+    assertEquals(1, testSubject.getAllowedGpus().size());
+    assertEquals(1, testSubject.getAvailableGpus());
+  }
+
+  @Test
+  public void testRequestZeroGpu() throws ResourceHandlerException {
+    addGpus(new GpuDevice(1, 1));
+
+    Container container = createMockContainer(0, 5L);
+    GpuAllocation allocation =
+        testSubject.assignGpus(container);
+
+    assertNoAllocation(allocation);
+  }
+
+  @Test
+  public void testRequestOneGpu() throws ResourceHandlerException, IOException 
{
+    GpuDevice gpu = new GpuDevice(1, 1);
+    addGpus(gpu);
+
+    Container container = createMockContainer(1, 5L);
+    GpuAllocation allocation =
+        testSubject.assignGpus(container);
+
+    assertEquals(1, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(1, testSubject.getAssignedGpus().size());
+    assertEquals(1, testSubject.getAllowedGpus().size());
+    assertEquals(0, testSubject.getAvailableGpus());
+
+    assertAllocatedGpu(gpu, container, allocation);
+  }
+
+  @Test
+  public void testRequestMoreThanAvailableGpu()
+      throws ResourceHandlerException {
+    addGpus(new GpuDevice(1, 1));
+    Container container = createMockContainer(2, 5L);
+
+    exception.expect(ResourceHandlerException.class);
+    exception.expectMessage("Failed to find enough GPUs");
+    testSubject.assignGpus(container);
+  }
+
+  @Test
+  public void testRequestMoreThanAvailableGpuAndOneContainerIsReleasingGpus()
+      throws ResourceHandlerException, IOException {
+    addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
+    Container container = createMockContainer(2, 5L);
+    GpuAllocation allocation = testSubject.assignGpus(container);
+    assertAllocatedGpus(2, 1, container, allocation);
+
+    assertEquals(2, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(2, testSubject.getAssignedGpus().size());
+    assertEquals(3, testSubject.getAllowedGpus().size());
+    assertEquals(1, testSubject.getAvailableGpus());
+
+    setupContainerAsReleasingGpus(container);
+    Container container2 = createMockContainer(2, 6L);
+
+    exception.expect(ResourceHandlerException.class);
+    exception.expectMessage("as some other containers might not " +
+        "releasing GPUs");
+    GpuAllocation allocation2 = testSubject.assignGpus(container2);
+    assertAllocatedGpus(2, 1, container, allocation2);
+  }
+
+  @Test
+  public void testThreeContainersJustTwoOfThemSatisfied()
+      throws ResourceHandlerException, IOException {
+    addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2),
+            new GpuDevice(1, 3), new GpuDevice(1, 4),
+            new GpuDevice(1, 5), new GpuDevice(1, 6));
+    Container container = createMockContainer(3, 5L);
+    Container container2 = createMockContainer(2, 6L);
+    Container container3 = createMockContainer(2, 6L);
+
+    GpuAllocation allocation = testSubject.assignGpus(container);
+    assertAllocatedGpus(3, 3, container, allocation);
+    assertEquals(3, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(3, testSubject.getAssignedGpus().size());
+    assertEquals(6, testSubject.getAllowedGpus().size());
+    assertEquals(3, testSubject.getAvailableGpus());
+
+    GpuAllocation allocation2 = testSubject.assignGpus(container2);
+    assertAllocatedGpus(2, 4, container2, allocation2);
+    assertEquals(5, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(5, testSubject.getAssignedGpus().size());
+    assertEquals(6, testSubject.getAllowedGpus().size());
+    assertEquals(1, testSubject.getAvailableGpus());
+
+    exception.expect(ResourceHandlerException.class);
+    exception.expectMessage("Failed to find enough GPUs");
+    testSubject.assignGpus(container3);
+  }
+
+  @Test
+  public void testReleaseAndAssignGpus()
+      throws ResourceHandlerException, IOException {
+    addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
+    Container container = createMockContainer(2, 5L);
+    GpuAllocation allocation = testSubject.assignGpus(container);
+    assertAllocatedGpus(2, 1, container, allocation);
+
+    assertEquals(2, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(2, testSubject.getAssignedGpus().size());
+    assertEquals(3, testSubject.getAllowedGpus().size());
+    assertEquals(1, testSubject.getAvailableGpus());
+
+    setupContainerAsReleasingGpus(container);
+    Container container2 = createMockContainer(2, 6L);
+    try {
+      testSubject.assignGpus(container2);
+    } catch (ResourceHandlerException e) {
+      //intended as we have not enough GPUs available
+    }
+
+    assertEquals(2, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(2, testSubject.getAssignedGpus().size());
+    assertEquals(3, testSubject.getAllowedGpus().size());
+    assertEquals(1, testSubject.getAvailableGpus());
+
+    testSubject.unassignGpus(container.getContainerId());
+    GpuAllocation allocation2 = testSubject.assignGpus(container2);
+    assertAllocatedGpus(2, 1, container, allocation2);
+  }
+
+  @Test
+  public void testCreateLotsOfContainersVerifyGpuAssignmentsAreCorrect()
+      throws ResourceHandlerException, IOException {
+    createAndAddGpus(100);
+
+    List<Container> containers = createMockContainers(3, 33);
+    List<GpuAllocation> allocations = Lists.newArrayList();
+    for (Container container : containers) {
+      GpuAllocation allocation = testSubject.assignGpus(container);
+      allocations.add(allocation);
+      assertAllocatedGpus(3, 97, container, allocation);
+    }
+
+    assertEquals(99, testSubject.getDeviceAllocationMapping().size());
+    assertEquals(99, testSubject.getAssignedGpus().size());
+    assertEquals(100, testSubject.getAllowedGpus().size());
+    assertEquals(1, testSubject.getAvailableGpus());
+
+    Set<GpuAllocation> duplicateAllocations = findDuplicates(allocations);
+    assertEquals(0, duplicateAllocations.size());
+  }
+
+  @Test
+  public void testGpuGetsUnassignedWhenStateStoreThrowsException()
+      throws ResourceHandlerException, IOException {
+    doThrow(new IOException("Failed to save container mappings " +
+        "to NM state store!"))
+        .when(nmStateStore).storeAssignedResources(any(Container.class),
+        anyString(), anyList());
+
+    createAndAddGpus(1);
+
+    exception.expect(ResourceHandlerException.class);
+    exception.expectMessage("Failed to save container mappings " +
+        "to NM state store");
+    Container container = createMockContainer(1, 5L);
+    testSubject.assignGpus(container);
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandlerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandlerImpl.java
index 392497f..777a85b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandlerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/TestGpuResourceHandlerImpl.java
@@ -158,7 +158,7 @@ public class TestGpuResourceHandlerImpl {
     gpuResourceHandler.bootstrap(conf);
 
     List<GpuDevice> allowedGpus =
-        gpuResourceHandler.getGpuAllocator().getAllowedGpusCopy();
+        gpuResourceHandler.getGpuAllocator().getAllowedGpus();
     assertEquals("Unexpected number of allowed GPU devices!", 1,
         allowedGpus.size());
     assertEquals("Expected GPU device does not equal to found device!",
@@ -497,7 +497,7 @@ public class TestGpuResourceHandlerImpl {
     gpuResourceHandler.reacquireContainer(getContainerId(1));
 
     Map<GpuDevice, ContainerId> deviceAllocationMapping =
-        gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy();
+        gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
     assertEquals("Unexpected number of allocated GPU devices!", 2,
         deviceAllocationMapping.size());
     assertTrue("Expected GPU device is not found in allocations!",
@@ -533,7 +533,7 @@ public class TestGpuResourceHandlerImpl {
 
     // Make sure internal state not changed.
     deviceAllocationMapping =
-        gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy();
+        gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
     assertEquals("Unexpected number of allocated GPU devices!",
         2, deviceAllocationMapping.size());
     assertTrue("Expected GPU devices are not found in allocations!",
@@ -568,7 +568,7 @@ public class TestGpuResourceHandlerImpl {
 
     // Make sure internal state not changed.
     deviceAllocationMapping =
-        gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy();
+        gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
     assertEquals("Unexpected number of allocated GPU devices!",
         2, deviceAllocationMapping.size());
     assertTrue("Expected GPU devices are not found in allocations!",


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to