Repository: samza Updated Branches: refs/heads/master 64f3f6b32 -> dfdc35e7e
SAMZA-786: improve reliability of host affinity tests Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dfdc35e7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dfdc35e7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dfdc35e7 Branch: refs/heads/master Commit: dfdc35e7e1484180932591b6d0e804fbcbfd7b40 Parents: 64f3f6b Author: Jacob Maes <[email protected]> Authored: Tue Oct 6 17:37:20 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Oct 6 17:37:20 2015 -0700 ---------------------------------------------------------------------- .../samza/job/yarn/ContainerRequestState.java | 8 +- .../samza/job/yarn/TestContainerAllocator.java | 91 +++++++-------- .../yarn/TestHostAwareContainerAllocator.java | 111 ++++++++++--------- .../job/yarn/util/MockContainerListener.java | 80 +++++++++++++ .../yarn/util/MockContainerRequestState.java | 69 ++++++++++++ 5 files changed, 253 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java index b5e0368..4b36a91 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java @@ -200,8 +200,11 @@ public class ContainerRequestState { /** * If requestQueue is empty, all extra containers in the buffer should be released and update the entire system's state * Needs to be synchronized because it is modifying shared state buffers + * @return the number of containers released. */ - public synchronized void releaseExtraContainers() { + public synchronized int releaseExtraContainers() { + int numReleasedContainers = 0; + if (hostAffinityEnabled) { if (requestsQueue.isEmpty()) { log.info("Requests Queue is empty. Should clear up state."); @@ -213,6 +216,7 @@ public class ContainerRequestState { for (Container c : containers) { log.info("Releasing extra container {} allocated on {}", c.getId(), host); amClient.releaseAssignedContainer(c.getId()); + numReleasedContainers++; } } } @@ -227,10 +231,12 @@ public class ContainerRequestState { Container c = availableContainers.remove(0); log.info("Releasing extra allocated container - {}", c.getId()); amClient.releaseAssignedContainer(c.getId()); + numReleasedContainers++; } clearState(); } } + return numReleasedContainers; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java index 0d07e28..01f32a4 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java @@ -19,6 +19,11 @@ package org.apache.samza.job.yarn; +import java.lang.reflect.Field; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; @@ -31,7 +36,10 @@ import org.apache.samza.coordinator.server.HttpServer; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.job.yarn.util.*; +import org.apache.samza.job.yarn.util.MockContainerListener; +import org.apache.samza.job.yarn.util.MockContainerRequestState; +import org.apache.samza.job.yarn.util.MockHttpServer; +import org.apache.samza.job.yarn.util.TestAMRMClientImpl; import org.apache.samza.job.yarn.util.TestUtil; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletHolder; @@ -39,13 +47,10 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Field; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestContainerAllocator { private final int ALLOCATOR_SLEEP_TIME = 10; @@ -54,6 +59,7 @@ public class TestContainerAllocator { private AMRMClientAsyncImpl amRmClientAsync; private TestAMRMClientImpl testAMRMClient; + private MockContainerRequestState requestState; private ContainerAllocator containerAllocator; private Thread allocatorThread; @@ -97,11 +103,16 @@ public class TestContainerAllocator { // Initialize certain state variables (mostly to avoid NPE) state.coordinatorUrl = new URL("http://localhost:7778/"); + requestState = new MockContainerRequestState(amRmClientAsync, false); containerAllocator = new ContainerAllocator( amRmClientAsync, TestUtil.getContainerUtil(config, state), ALLOCATOR_SLEEP_TIME ); + Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); + requestStateField.setAccessible(true); + requestStateField.set(containerAllocator, requestState); + allocatorThread = new Thread(containerAllocator); } @@ -116,21 +127,6 @@ public class TestContainerAllocator { */ @Test public void testAddContainer() throws Exception { - - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - - allocatorThread.start(); - - containerAllocator.requestContainers(new HashMap<Integer, String>() { - { - put(0, ANY_HOST); - put(1, ANY_HOST); - } - }); - - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - assertNull(requestState.getContainersOnAHost("abc")); assertNull(requestState.getContainersOnAHost(ANY_HOST)); @@ -160,10 +156,6 @@ public class TestContainerAllocator { containerAllocator.requestContainers(containersToHostMapping); - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - assertNotNull(testAMRMClient.requests); assertEquals(4, testAMRMClient.requests.size()); @@ -191,10 +183,6 @@ public class TestContainerAllocator { containerAllocator.requestContainers(containersToHostMapping); - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - assertNotNull(requestState); assertNotNull(requestState.getRequestsQueue()); @@ -212,9 +200,27 @@ public class TestContainerAllocator { */ @Test public void testAllocatorReleasesExtraContainers() throws Exception { - Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123); - Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123); + final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123); + final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); + final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123); + + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(3, 2, null, new Runnable() { + @Override + public void run() { + assertNotNull(testAMRMClient.getRelease()); + assertEquals(2, testAMRMClient.getRelease().size()); + assertTrue(testAMRMClient.getRelease().contains(container1.getId())); + assertTrue(testAMRMClient.getRelease().contains(container2.getId())); + + // Test that state is cleaned up + assertEquals(0, requestState.getRequestsQueue().size()); + assertEquals(0, requestState.getRequestsToCountMap().size()); + assertNull(requestState.getContainersOnAHost("abc")); + assertNull(requestState.getContainersOnAHost("def")); + } + }); + requestState.registerContainerListener(listener); allocatorThread.start(); @@ -224,22 +230,7 @@ public class TestContainerAllocator { containerAllocator.addContainer(container1); containerAllocator.addContainer(container2); - Thread.sleep(600); - - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - - assertNotNull(testAMRMClient.getRelease()); - assertEquals(2, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container1.getId())); - assertTrue(testAMRMClient.getRelease().contains(container2.getId())); - - // Test that state is cleaned up - assertEquals(0, requestState.getRequestsQueue().size()); - assertEquals(0, requestState.getRequestsToCountMap().size()); - assertNull(requestState.getContainersOnAHost("abc")); - assertNull(requestState.getContainersOnAHost("def")); + listener.verify(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java index ee4a5b0..663ea25 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java @@ -18,6 +18,12 @@ */ package org.apache.samza.job.yarn; +import java.lang.reflect.Field; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; @@ -30,6 +36,8 @@ import org.apache.samza.coordinator.server.HttpServer; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; +import org.apache.samza.job.yarn.util.MockContainerListener; +import org.apache.samza.job.yarn.util.MockContainerRequestState; import org.apache.samza.job.yarn.util.MockContainerUtil; import org.apache.samza.job.yarn.util.MockHttpServer; import org.apache.samza.job.yarn.util.TestAMRMClientImpl; @@ -40,16 +48,10 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Field; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestHostAwareContainerAllocator { private static final int ALLOCATOR_SLEEP_TIME = 1; @@ -60,6 +62,7 @@ public class TestHostAwareContainerAllocator { private AMRMClientAsyncImpl amRmClientAsync; private TestAMRMClientImpl testAMRMClient; + private MockContainerRequestState requestState; private HostAwareContainerAllocator containerAllocator; private Thread allocatorThread; private ContainerUtil containerUtil; @@ -114,12 +117,17 @@ public class TestHostAwareContainerAllocator { containerUtil = TestUtil.getContainerUtil(getConfig(), state); + requestState = new MockContainerRequestState(amRmClientAsync, true); containerAllocator = new HostAwareContainerAllocator( amRmClientAsync, containerUtil, ALLOCATOR_SLEEP_TIME, CONTAINER_REQUEST_TIMEOUT ); + Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); + requestStateField.setAccessible(true); + requestStateField.set(containerAllocator, requestState); + allocatorThread = new Thread(containerAllocator); } @@ -131,9 +139,27 @@ public class TestHostAwareContainerAllocator { @Test public void testAllocatorReleasesExtraContainers() throws Exception { - Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123); - Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123); + final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123); + final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); + final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123); + + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(3, 2, null, new Runnable() { + @Override + public void run() { + assertNotNull(testAMRMClient.getRelease()); + assertEquals(2, testAMRMClient.getRelease().size()); + assertTrue(testAMRMClient.getRelease().contains(container1.getId())); + assertTrue(testAMRMClient.getRelease().contains(container2.getId())); + + // Test that state is cleaned up + assertEquals(0, requestState.getRequestsQueue().size()); + assertEquals(0, requestState.getRequestsToCountMap().size()); + assertNull(requestState.getContainersOnAHost("abc")); + assertNull(requestState.getContainersOnAHost("def")); + } + }); + requestState.registerContainerListener(listener); allocatorThread.start(); @@ -143,22 +169,7 @@ public class TestHostAwareContainerAllocator { containerAllocator.addContainer(container1); containerAllocator.addContainer(container2); - Thread.sleep(600); - - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - - assertNotNull(testAMRMClient.getRelease()); - assertEquals(2, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container1.getId())); - assertTrue(testAMRMClient.getRelease().contains(container2.getId())); - - // Test that state is cleaned up - assertEquals(0, requestState.getRequestsQueue().size()); - assertEquals(0, requestState.getRequestsToCountMap().size()); - assertNull(requestState.getContainersOnAHost("abc")); - assertNull(requestState.getContainersOnAHost("def")); + listener.verify(); } /** @@ -176,10 +187,6 @@ public class TestHostAwareContainerAllocator { containerAllocator.requestContainers(containersToHostMapping); - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - assertNotNull(requestState); assertNotNull(requestState.getRequestsQueue()); @@ -195,10 +202,6 @@ public class TestHostAwareContainerAllocator { */ @Test public void testAddContainerWithHostAffinity() throws Exception { - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - containerAllocator.requestContainers(new HashMap<Integer, String>() { { put(0, "abc"); @@ -206,8 +209,6 @@ public class TestHostAwareContainerAllocator { } }); - allocatorThread.start(); - assertNotNull(requestState.getContainersOnAHost("abc")); assertEquals(0, requestState.getContainersOnAHost("abc").size()); @@ -221,16 +222,18 @@ public class TestHostAwareContainerAllocator { containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123)); assertNotNull(requestState.getContainersOnAHost("abc")); - assertTrue(requestState.getContainersOnAHost("abc").size() == 1); + assertEquals(1, requestState.getContainersOnAHost("abc").size()); assertNotNull(requestState.getContainersOnAHost("xyz")); - assertTrue(requestState.getContainersOnAHost("xyz").size() == 1); + assertEquals(1, requestState.getContainersOnAHost("xyz").size()); assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1); - assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), requestState.getContainersOnAHost(ANY_HOST).get(0).getId()); + assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), + requestState.getContainersOnAHost(ANY_HOST).get(0).getId()); } + @Test public void testRequestContainers() throws Exception { Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { @@ -245,11 +248,6 @@ public class TestHostAwareContainerAllocator { containerAllocator.requestContainers(containersToHostMapping); - - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - assertNotNull(testAMRMClient.requests); assertEquals(4, testAMRMClient.requests.size()); @@ -284,10 +282,6 @@ public class TestHostAwareContainerAllocator { }; containerAllocator.requestContainers(containersToHostMapping); - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - ContainerRequestState requestState = (ContainerRequestState) requestStateField.get(containerAllocator); - assertNotNull(requestState.getRequestsQueue()); assertTrue(requestState.getRequestsQueue().size() == 2); @@ -298,19 +292,26 @@ public class TestHostAwareContainerAllocator { assertNotNull(requestState.getRequestsToCountMap().get("def")); assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1); - allocatorThread.start(); + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(2, 0, new Runnable() { + @Override + public void run() { + assertNull(requestState.getContainersOnAHost("xyz")); + assertNull(requestState.getContainersOnAHost("zzz")); + assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); + assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2); + } + }, null); + requestState.registerContainerListener(listener); - Thread.sleep(600); + allocatorThread.start(); Container container0 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "xyz", 123); Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123); containerAllocator.addContainer(container0); containerAllocator.addContainer(container1); - assertNull(requestState.getContainersOnAHost("xyz")); - assertNull(requestState.getContainersOnAHost("zzz")); - assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); - assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2); + listener.verify(); Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java new file mode 100644 index 0000000..8fc0b98 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn.util; + +import org.apache.hadoop.yarn.api.records.Container; + +import static org.junit.Assert.assertTrue; + +public class MockContainerListener { + private static final int NUM_CONDITIONS = 2; + private boolean allContainersAdded = false; + private boolean allContainersReleased = false; + private final int numExpectedContainersAdded; + private final int numExpectedContainersReleased; + private final Runnable addContainerAssertions; + private final Runnable releaseContainerAssertions; + + public MockContainerListener(int numExpectedContainersAdded, + int numExpectedContainersReleased, + Runnable addContainerAssertions, + Runnable releaseContainerAssertions) { + this.numExpectedContainersAdded = numExpectedContainersAdded; + this.numExpectedContainersReleased = numExpectedContainersReleased; + this.addContainerAssertions = addContainerAssertions; + this.releaseContainerAssertions = releaseContainerAssertions; + } + + public synchronized void postAddContainer(Container container, int totalAddedContainers) { + if (totalAddedContainers == numExpectedContainersAdded) { + if (addContainerAssertions != null) { + addContainerAssertions.run(); + } + + allContainersAdded = true; + this.notifyAll(); + } + } + + public synchronized void postReleaseContainers(int totalReleasedContainers) { + if (totalReleasedContainers == numExpectedContainersReleased) { + if (releaseContainerAssertions != null) { + releaseContainerAssertions.run(); + } + + allContainersReleased = true; + this.notifyAll(); + } + } + + public synchronized void verify() { + // There could be 1 notifyAll() for each condition, so we must wait up to that many times + for (int i = 0; i < NUM_CONDITIONS && !(allContainersAdded && allContainersReleased); i++) { + try { + this.wait(5000); + } catch (InterruptedException e) { + // Do nothing + } + } + + assertTrue("Not all containers were added.", allContainersAdded); + assertTrue("Not all containers were released.", allContainersReleased); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dfdc35e7/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java new file mode 100644 index 0000000..e7441e5 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn.util; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.samza.job.yarn.ContainerRequestState; + + +public class MockContainerRequestState extends ContainerRequestState { + private final List<MockContainerListener> _mockContainerListeners = new ArrayList<MockContainerListener>(); + private int numAddedContainers = 0; + private int numReleasedContainers = 0; + + public MockContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient, + boolean hostAffinityEnabled) { + super(amClient, hostAffinityEnabled); + } + + + @Override + public synchronized void addContainer(Container container) { + super.addContainer(container); + + numAddedContainers++; + for (MockContainerListener listener : _mockContainerListeners) { + listener.postAddContainer(container, numAddedContainers); + } + } + + @Override + public synchronized int releaseExtraContainers() { + numReleasedContainers += super.releaseExtraContainers(); + + for (MockContainerListener listener : _mockContainerListeners) { + listener.postReleaseContainers(numReleasedContainers); + } + + return numAddedContainers; + } + + public void registerContainerListener(MockContainerListener listener) { + _mockContainerListeners.add(listener); + } + + public void clearContainerListeners() { + _mockContainerListeners.clear(); + } + +}
