Repository: samza Updated Branches: refs/heads/master 7e68e4b10 -> 03e5026cf
SAMZA-1552: Host affinity improvements - Improve matching of hosts to allocated resources Author: Jagadish <[email protected]> Reviewers: Prateek <[email protected]> Closes #401 from vjagadish1989/host-affinity-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/03e5026c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/03e5026c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/03e5026c Branch: refs/heads/master Commit: 03e5026cf56d955c203e701c80e28c42d792fbce Parents: 7e68e4b Author: Jagadish <[email protected]> Authored: Wed Jan 31 21:38:14 2018 -0800 Committer: Jagadish <[email protected]> Committed: Wed Jan 31 21:38:14 2018 -0800 ---------------------------------------------------------------------- .../clustermanager/ResourceRequestState.java | 29 +++- .../MockHostAwareContainerAllocator.java | 68 +++++++++ .../TestContainerProcessManager.java | 138 ++++++++++++++----- .../TestHostAwareContainerAllocator.java | 40 ++++++ 4 files changed, 234 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java index 77c192e..fe2067c 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java @@ -171,7 +171,9 @@ public class ResourceRequestState { public void updateStateAfterAssignment(SamzaResourceRequest request, String assignedHost, SamzaResource samzaResource) { synchronized (lock) { requestsQueue.remove(request); + // A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer. allocatedResources.get(assignedHost).remove(samzaResource); + allocatedResources.get(ANY_HOST).remove(samzaResource); if (hostAffinityEnabled) { // assignedHost may not always be the preferred host. // Hence, we should safely decrement the counter for the preferredHost @@ -266,18 +268,33 @@ public class ResourceRequestState { /** * Retrieves, but does not remove, the first allocated resource on the specified host. * - * @param host the host for which a resource is needed. - * @return the first {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one. + * @param host the host for which a resource is needed. + * @return a {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one. */ - public SamzaResource peekResource(String host) { synchronized (lock) { - List<SamzaResource> resourcesOnTheHost = this.allocatedResources.get(host); + List<SamzaResource> resourcesOnPreferredHostBuffer = this.allocatedResources.get(host); + List<SamzaResource> resourcesOnAnyHostBuffer = this.allocatedResources.get(ANY_HOST); - if (resourcesOnTheHost == null || resourcesOnTheHost.isEmpty()) { + // First search for the preferred host buffers + if (resourcesOnPreferredHostBuffer != null && !resourcesOnPreferredHostBuffer.isEmpty()) { + SamzaResource resource = resourcesOnPreferredHostBuffer.get(0); + log.info("Returning a buffered resource: {} for {} from preferred-host buffer.", resource.getResourceID(), host); + return resource; + } else if (resourcesOnAnyHostBuffer != null && !resourcesOnAnyHostBuffer.isEmpty()) { + // If preferred host buffers are empty, scan the ANY_HOST buffer + log.debug("No resources on preferred-host buffer. Scanning ANY_HOST buffer"); + SamzaResource resource = resourcesOnAnyHostBuffer.stream() + .filter(resrc -> resrc.getHost().equals(host)) + .findAny().orElse(null); + if (resource != null) { + log.info("Returning a buffered resource: {} for {} from ANY_HOST buffer.", resource.getResourceID(), host); + } + return resource; + } else { + log.debug("Cannot find any resource in the ANY_HOST buffer for {} because both buffers are empty", host); return null; } - return resourcesOnTheHost.get(0); } } http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java new file mode 100644 index 0000000..ea05fff --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.config.Config; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +public class MockHostAwareContainerAllocator extends HostAwareContainerAllocator { + private static final int ALLOCATOR_TIMEOUT_MS = 10000; + private Semaphore semaphore = new Semaphore(0); + + public MockHostAwareContainerAllocator(ClusterResourceManager manager, + Config config, SamzaApplicationState state) { + super(manager, ALLOCATOR_TIMEOUT_MS, config, state); + } + + /** + * Causes the current thread to block until the expected number of containers have started. + * + * @param numExpectedContainers the number of containers expected to start + * @param timeout the maximum time to wait + * @param unit the time unit of the {@code timeout} argument + * + * @return a boolean that specifies whether containers started within the timeout. + * @throws InterruptedException if the current thread is interrupted while waiting + */ + boolean awaitContainersStart(int numExpectedContainers, long timeout, TimeUnit unit) throws InterruptedException { + return semaphore.tryAcquire(numExpectedContainers, timeout, unit); + } + + @Override + public void requestResources(Map<String, String> containerToHostMappings) { + super.requestResources(containerToHostMappings); + } + + public ResourceRequestState getContainerRequestState() throws Exception { + Field field = AbstractContainerAllocator.class.getDeclaredField("resourceRequestState"); + field.setAccessible(true); + + return (ResourceRequestState) field.get(this); + } + + @Override + protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) { + super.runStreamProcessor(request, preferredHost); + semaphore.release(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 5c2fe4a..d648a80 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -19,6 +19,7 @@ package org.apache.samza.clustermanager; +import com.google.common.collect.ImmutableMap; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -52,7 +53,7 @@ public class TestContainerProcessManager { private static volatile boolean isRunning = false; - private Map<String, String> configVals = new HashMap<String, String>() { + private Map<String, String> configVals = new HashMap<String, String>() { { put("cluster-manager.container.count", "1"); put("cluster-manager.container.retry.count", "1"); @@ -86,15 +87,16 @@ public class TestContainerProcessManager { private SamzaApplicationState state = null; - private JobModelManager getJobModelManagerWithHostAffinity(int containerCount) { + private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) { Map<String, Map<String, String>> localityMap = new HashMap<>(); - localityMap.put("0", new HashMap<String, String>() { { - put(SetContainerHostMapping.HOST_KEY, "abc"); - } }); + containerIdToHost.forEach((containerId, host) -> { + localityMap.put(containerId, ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId))); + }); LocalityManager mockLocalityManager = mock(LocalityManager.class); when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap); - return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerCount, mockLocalityManager, this.server); + return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), + containerIdToHost.size(), mockLocalityManager, this.server); } private JobModelManager getJobModelManagerWithoutHostAffinity(int containerCount) { @@ -140,7 +142,7 @@ public class TestContainerProcessManager { conf.put("cluster-manager.container.memory.mb", "500"); conf.put("cluster-manager.container.cpu.cores", "5"); - state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1)); + state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1"))); taskManager = new ContainerProcessManager( new MapConfig(conf), state, @@ -203,7 +205,7 @@ public class TestContainerProcessManager { Config conf = getConfig(); state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); - ContainerProcessManager taskManager = new ContainerProcessManager( + ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), state, new MetricsRegistryMap(), @@ -228,16 +230,16 @@ public class TestContainerProcessManager { state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); ContainerProcessManager taskManager = new ContainerProcessManager( - new MapConfig(conf), - state, - new MetricsRegistryMap(), + new MapConfig(conf), + state, + new MetricsRegistryMap(), clusterResourceManager ); MockContainerAllocator allocator = new MockContainerAllocator( clusterResourceManager, - conf, - state); + conf, + state); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); @@ -250,7 +252,7 @@ public class TestContainerProcessManager { assertFalse(taskManager.shouldShutdown()); assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); - SamzaResource container = new SamzaResource(1, 1024, "abc", "id0"); + SamzaResource container = new SamzaResource(1, 1024, "host1", "id0"); taskManager.onResourceAllocated(container); // Allow container to run and update state @@ -299,7 +301,7 @@ public class TestContainerProcessManager { assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); - SamzaResource container = new SamzaResource(1, 1024, "abc", "id0"); + SamzaResource container = new SamzaResource(1, 1024, "host1", "id0"); taskManager.onResourceAllocated(container); // Allow container to run and update state @@ -353,16 +355,16 @@ public class TestContainerProcessManager { state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); ContainerProcessManager taskManager = new ContainerProcessManager( - new MapConfig(conf), - state, - new MetricsRegistryMap(), + new MapConfig(conf), + state, + new MetricsRegistryMap(), clusterResourceManager ); MockContainerAllocator allocator = new MockContainerAllocator( clusterResourceManager, - conf, - state); + conf, + state); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); Thread thread = new Thread(allocator); @@ -371,7 +373,7 @@ public class TestContainerProcessManager { // Start the task clusterResourceManager taskManager.start(); - SamzaResource container = new SamzaResource(1, 1000, "abc", "id1"); + SamzaResource container = new SamzaResource(1, 1000, "host1", "id1"); taskManager.onResourceAllocated(container); // Allow container to run and update state @@ -391,7 +393,7 @@ public class TestContainerProcessManager { @Test public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { - state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1)); + state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1"))); Map<String, String> configMap = new HashMap<>(); configMap.putAll(getConfig()); @@ -404,7 +406,7 @@ public class TestContainerProcessManager { clusterResourceManager); manager.start(); - SamzaResource resource = new SamzaResource(1, 1024, "abc", "resource-1"); + SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1"); state.pendingContainers.put("1", resource); Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 1); manager.onStreamProcessorLaunchFailure(resource, new Exception("cannot launch container!")); @@ -413,6 +415,72 @@ public class TestContainerProcessManager { } @Test + public void testAllBufferedResourcesAreUtilized() throws Exception { + Map<String, String> config = new HashMap<>(); + config.putAll(getConfigWithHostAffinity()); + config.put("cluster-manager.container.count", "2"); + Config cfg = new MapConfig(config); + // 1. Request two containers on hosts - host1 and host2 + state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1", + "1", "host2"))); + + ContainerProcessManager taskManager = new ContainerProcessManager( + cfg, + state, + new MetricsRegistryMap(), + clusterResourceManager + ); + + MockHostAwareContainerAllocator allocator = new MockHostAwareContainerAllocator( + clusterResourceManager, + cfg, + state); + getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); + + Thread thread = new Thread(allocator); + getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); + + taskManager.start(); + assertFalse(taskManager.shouldShutdown()); + // 2. When the task manager starts, there should have been a pending request on host1 and host2 + assertEquals(2, allocator.getContainerRequestState().numPendingRequests()); + + // 3. Allocate an extra resource on host1 and no resource on host2 yet. + SamzaResource resource1 = new SamzaResource(1, 1000, "host1", "id1"); + SamzaResource resource2 = new SamzaResource(1, 1000, "host1", "id2"); + taskManager.onResourceAllocated(resource1); + taskManager.onResourceAllocated(resource2); + + // 4. Wait for the container to start on host1 and immediately fail + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + taskManager.onStreamProcessorLaunchSuccess(resource1); + assertEquals("host2", allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + + taskManager.onResourceCompleted(new SamzaResourceStatus(resource1.getResourceID(), "App Error", 1)); + assertEquals(2, allocator.getContainerRequestState().numPendingRequests()); + + assertFalse(taskManager.shouldShutdown()); + assertFalse(state.jobHealthy.get()); + assertEquals(3, clusterResourceManager.resourceRequests.size()); + assertEquals(0, clusterResourceManager.releasedResources.size()); + + // 5. Do not allocate any further resource on host1, and verify that the re-run of the container on host1 uses the + // previously allocated extra resource + SamzaResource resource3 = new SamzaResource(1, 1000, "host2", "id3"); + taskManager.onResourceAllocated(resource3); + + if (!allocator.awaitContainersStart(2, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + taskManager.onStreamProcessorLaunchSuccess(resource3); + + assertTrue(state.jobHealthy.get()); + } + + @Test public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception { Config conf = getConfig(); @@ -421,16 +489,16 @@ public class TestContainerProcessManager { state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); ContainerProcessManager taskManager = new ContainerProcessManager( - new MapConfig(conf), - state, - new MetricsRegistryMap(), + new MapConfig(conf), + state, + new MetricsRegistryMap(), clusterResourceManager ); MockContainerAllocator allocator = new MockContainerAllocator( clusterResourceManager, - conf, - state); + conf, + state); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); Thread thread = new Thread(allocator); @@ -441,7 +509,7 @@ public class TestContainerProcessManager { assertFalse(taskManager.shouldShutdown()); assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); - SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1"); + SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1"); taskManager.onResourceAllocated(container1); // Allow container to run and update state @@ -462,7 +530,7 @@ public class TestContainerProcessManager { assertEquals(0, clusterResourceManager.releasedResources.size()); assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); - SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2"); + SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2"); taskManager.onResourceAllocated(container2); // Allow container to run and update state @@ -515,7 +583,7 @@ public class TestContainerProcessManager { assertFalse(taskManager.shouldShutdown()); assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); - SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1"); + SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1"); taskManager.onResourceAllocated(container1); // Allow container to run and update state @@ -525,7 +593,7 @@ public class TestContainerProcessManager { assertEquals(0, allocator.getContainerRequestState().numPendingRequests()); taskManager.onStreamProcessorLaunchSuccess(container1); // Create container failure - with ContainerExitStatus.DISKS_FAILED - taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL)); + taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "App error", 1)); // The above failure should trigger a container request assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); @@ -535,7 +603,7 @@ public class TestContainerProcessManager { assertEquals(0, clusterResourceManager.releasedResources.size()); assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); - SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2"); + SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2"); taskManager.onResourceAllocated(container2); // Allow container to run and update state @@ -545,7 +613,7 @@ public class TestContainerProcessManager { taskManager.onStreamProcessorLaunchSuccess(container2); // Create container failure - with ContainerExitStatus.PREEMPTED - taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED)); + taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED)); assertEquals(3, clusterResourceManager.resourceRequests.size()); // The above failure should trigger a container request @@ -553,7 +621,7 @@ public class TestContainerProcessManager { assertFalse(taskManager.shouldShutdown()); assertFalse(state.jobHealthy.get()); assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); - SamzaResource container3 = new SamzaResource(1, 1000, "abc", "id3"); + SamzaResource container3 = new SamzaResource(1, 1000, "host1", "id3"); taskManager.onResourceAllocated(container3); // Allow container to run and update state http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java index 6260b71..c26d727 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java @@ -138,6 +138,46 @@ public class TestHostAwareContainerAllocator { assertEquals("ID2", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getResourceID()); } + /** + * Test that extra resources are buffered under ANY_HOST + */ + @Test + public void testSurplusResourcesAreBufferedUnderAnyHost() throws Exception { + containerAllocator.requestResources(new HashMap<String, String>() { + { + put("0", "abc"); + put("1", "xyz"); + } + }); + + assertNotNull(requestState.getResourcesOnAHost("abc")); + assertEquals(0, requestState.getResourcesOnAHost("abc").size()); + + assertNotNull(requestState.getResourcesOnAHost("xyz")); + assertEquals(0, requestState.getResourcesOnAHost("xyz").size()); + + assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + + containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1")); + containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID2")); + // surplus resources for host - "abc" + containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID3")); + containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID4")); + containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID5")); + containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID6")); + + assertNotNull(requestState.getResourcesOnAHost("abc")); + assertEquals(1, requestState.getResourcesOnAHost("abc").size()); + + assertNotNull(requestState.getResourcesOnAHost("xyz")); + assertEquals(1, requestState.getResourcesOnAHost("xyz").size()); + + assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + // assert that the surplus resources goto the ANY_HOST buffer + assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 4); + assertEquals("ID3", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getResourceID()); + } + @Test public void testAllocatorReleasesExtraContainers() throws Exception { final SamzaResource resource0 = new SamzaResource(1, 1024, "abc", "id1");
