This is an automated email from the ASF dual-hosted git repository. rayman pushed a commit to branch revert-1180-container-placement-service in repository https://gitbox.apache.org/repos/asf/samza.git
commit badf8144bdf6d78d232d58e0c333db8cdc22d98f Author: rmatharu <[email protected]> AuthorDate: Tue Nov 5 16:08:02 2019 -0800 Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers" --- .../samza/clustermanager/ContainerAllocator.java | 50 ++++++- .../samza/clustermanager/ContainerManager.java | 164 --------------------- .../clustermanager/ContainerProcessManager.java | 55 ++++--- .../MockContainerAllocatorWithHostAffinity.java | 5 +- .../MockContainerAllocatorWithoutHostAffinity.java | 9 +- .../TestContainerAllocatorWithHostAffinity.java | 40 +++-- .../TestContainerAllocatorWithoutHostAffinity.java | 15 +- .../TestContainerProcessManager.java | 53 ++----- 8 files changed, 129 insertions(+), 262 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java index 38659f3..89855dc 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java @@ -18,6 +18,7 @@ */ package org.apache.samza.clustermanager; +import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.time.Instant; import java.util.Map; @@ -114,13 +115,13 @@ public class ContainerAllocator implements Runnable { */ private final int requestExpiryTimeout; - private final ContainerManager containerManager; + private final Optional<StandbyContainerManager> standbyContainerManager; public ContainerAllocator(ClusterResourceManager clusterResourceManager, Config config, SamzaApplicationState state, boolean hostAffinityEnabled, - ContainerManager containerManager) { + Optional<StandbyContainerManager> standbyContainerManager) { ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); this.clusterResourceManager = clusterResourceManager; this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime(); @@ -131,7 +132,7 @@ public class ContainerAllocator implements Runnable { this.state = state; this.config = config; this.hostAffinityEnabled = hostAffinityEnabled; - this.containerManager = containerManager; + this.standbyContainerManager = standbyContainerManager; this.requestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout(); } @@ -203,7 +204,12 @@ public class ContainerAllocator implements Runnable { state.matchedResourceRequests.incrementAndGet(); } - containerManager.handleContainerLaunch(request, preferredHost, peekAllocatedResource(preferredHost), resourceRequestState, this); + // If hot-standby is enabled, check standby constraints are met before launching a processor + if (this.standbyContainerManager.isPresent()) { + checkStandByContrainsAndRunStreamProcessor(request, preferredHost); + } else { + runStreamProcessor(request, preferredHost); + } } else { @@ -214,7 +220,7 @@ public class ContainerAllocator implements Runnable { if (expired) { updateExpiryMetrics(request); if (hostAffinityEnabled) { - containerManager.handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request, this, resourceRequestState); + handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request); } } else { LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet." @@ -227,6 +233,30 @@ public class ContainerAllocator implements Runnable { } /** + * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained + * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise + * issues an ANY_HOST request. + */ + @VisibleForTesting + void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost, + SamzaResourceRequest request) { + boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST); + if (standbyContainerManager.isPresent()) { + standbyContainerManager.get() + .handleExpiredResourceRequest(processorId, request, + Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState); + } else if (resourceAvailableOnAnyHost) { + LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost); + runStreamProcessor(request, ResourceRequestState.ANY_HOST); + } else { + LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.", + processorId, preferredHost); + resourceRequestState.cancelResourceRequest(request); + requestResource(processorId, ResourceRequestState.ANY_HOST); + } + } + + /** * Updates the request state and runs a processor on the specified host. Assumes a resource * is available on the preferred host, so the caller must verify that before invoking this method. * @@ -261,6 +291,16 @@ public class ContainerAllocator implements Runnable { } /** + * If {@code StandbyContainerManager} is present check standBy constraints are met before attempting to launch + * @param request outstanding request which has an allocated resource + * @param preferredHost to run the request + */ + private void checkStandByContrainsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost) { + standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, + peekAllocatedResource(preferredHost), this, resourceRequestState); + } + + /** * Called during initial request for resources * * @param processorToHostMapping A Map of [processorId, hostName], where processorId is the ID of the Samza processor diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java deleted file mode 100644 index 6b29bc8..0000000 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.clustermanager; - -import com.google.common.annotations.VisibleForTesting; -import java.time.Duration; -import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * ContainerManager is a centralized entity that manages control actions like start, stop for both active and standby containers - * ContainerManager acts as a brain for validating and issuing any actions on containers in the Job Coordinator. - * - * The requests to allocate resources resources made by {@link ContainerAllocator} can either expire or succeed. - * When the requests succeeds the ContainerManager validates those requests before starting the container - * When the requests expires the ContainerManager decides the next set of actions for the pending request. - * - * Callbacks issued from {@link ClusterResourceManager} aka {@link ContainerProcessManager} are intercepted - * by ContainerManager to handle container failure and completions for both active and standby containers - */ -public class ContainerManager { - - private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class); - - /** - * Resource-manager, used to stop containers - */ - private final ClusterResourceManager clusterResourceManager; - private final SamzaApplicationState samzaApplicationState; - - private final Optional<StandbyContainerManager> standbyContainerManager; - - public ContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager, - Boolean standByEnabled) { - this.samzaApplicationState = samzaApplicationState; - this.clusterResourceManager = clusterResourceManager; - // Enable standby container manager if required - if (standByEnabled) { - this.standbyContainerManager = - Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager)); - } else { - this.standbyContainerManager = Optional.empty(); - } - } - - /** - * Handles the container start action for both active & standby containers. - * - * @param request pending request for the preferred host - * @param preferredHost preferred host to start the container - * @param allocatedResource resource allocated from {@link ClusterResourceManager} - * @param resourceRequestState state of request in {@link ContainerAllocator} - * @param allocator to request resources from @{@link ClusterResourceManager} - */ - void handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource, - ResourceRequestState resourceRequestState, ContainerAllocator allocator) { - if (this.standbyContainerManager.isPresent()) { - standbyContainerManager.get() - .checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, allocatedResource, allocator, - resourceRequestState); - } else { - allocator.runStreamProcessor(request, preferredHost); - } - } - - /** - * Handles the action to be taken after the container has been stopped. - * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerStop} to check constraints. - * Case 2. When standby is disabled there are two cases according to host-affinity being enabled - * Case 2.1. When host-affinity is enabled resources are requested on host where container was last seen - * Case 2.2. When host-affinity is disabled resources are requested for ANY_HOST - * - * @param processorId logical id of the container - * @param containerId last known id of the container deployed - * @param preferredHost host on which container was last deployed - * @param exitStatus exit code returned by the container - * @param preferredHostRetryDelay delay to be incurred before requesting resources - * @param containerAllocator allocator for requesting resources - */ - void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, - Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) { - if (standbyContainerManager.isPresent()) { - standbyContainerManager.get() - .handleContainerStop(processorId, containerId, preferredHost, exitStatus, containerAllocator, - preferredHostRetryDelay); - } else { - // If StandbyTasks are not enabled, we simply make a request for the preferredHost - containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay); - } - } - - /** - * Handle the container launch failure for active containers and standby (if enabled). - * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerLaunchFail} to check behavior - * Case 2. When standby is disabled the allocator issues a request for ANY_HOST resources - * - * @param processorId logical id of the container - * @param containerId last known id of the container deployed - * @param preferredHost host on which container is requested to be deployed - * @param containerAllocator allocator for requesting resources - */ - void handleContainerLaunchFail(String processorId, String containerId, String preferredHost, - ContainerAllocator containerAllocator) { - if (processorId != null && standbyContainerManager.isPresent()) { - standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator); - } else if (processorId != null) { - LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}", - processorId, containerId, preferredHost); - containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST); - } else { - LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " - + "Ignoring invalid/redundant notification.", containerId, preferredHost); - } - } - - /** - * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained - * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise - * issues an ANY_HOST request. Only applies to HOST_AFFINITY enabled cases - * - * @param processorId logical id of the container - * @param preferredHost host on which container is requested to be deployed - * @param request pending request for the preferred host - * @param allocator allocator for requesting resources - * @param resourceRequestState state of request in {@link ContainerAllocator} - */ - @VisibleForTesting - void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost, - SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) { - boolean resourceAvailableOnAnyHost = allocator.hasAllocatedResource(ResourceRequestState.ANY_HOST); - if (standbyContainerManager.isPresent()) { - standbyContainerManager.get() - .handleExpiredResourceRequest(processorId, request, - Optional.ofNullable(allocator.peekAllocatedResource(ResourceRequestState.ANY_HOST)), allocator, - resourceRequestState); - } else if (resourceAvailableOnAnyHost) { - LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost); - allocator.runStreamProcessor(request, ResourceRequestState.ANY_HOST); - } else { - LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.", - processorId, preferredHost); - resourceRequestState.cancelResourceRequest(request); - allocator.requestResource(processorId, ResourceRequestState.ANY_HOST); - } - } -} diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 398ed6d..cb0e537 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -95,8 +95,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback private final ContainerAllocator containerAllocator; private final Thread allocatorThread; - // The ContainerManager manages control actions for both active & standby containers - private final ContainerManager containerManager; + // The StandbyContainerManager manages standby-aware allocation and failover of containers + private final Optional<StandbyContainerManager> standbyContainerManager; private final Option<DiagnosticsManager> diagnosticsManager; @@ -158,9 +158,14 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback // Wire all metrics to all reporters this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry)); - this.containerManager = new ContainerManager(state, clusterResourceManager, jobConfig.getStandbyTasksEnabled()); + // Enable standby container manager if required + if (jobConfig.getStandbyTasksEnabled()) { + this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager)); + } else { + this.standbyContainerManager = Optional.empty(); + } - this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager); + this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager); this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); LOG.info("Finished container process manager initialization."); } @@ -170,8 +175,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback SamzaApplicationState state, MetricsRegistryMap registry, ClusterResourceManager resourceManager, - Optional<ContainerAllocator> allocator, - ContainerManager containerManager) { + Optional<ContainerAllocator> allocator) { this.state = state; this.clusterManagerConfig = clusterManagerConfig; this.jobConfig = new JobConfig(clusterManagerConfig); @@ -179,11 +183,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); this.clusterResourceManager = resourceManager; - this.containerManager = containerManager; + this.standbyContainerManager = Optional.empty(); this.diagnosticsManager = Option.empty(); this.containerAllocator = allocator.orElseGet( () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state, - hostAffinityEnabled, this.containerManager)); + hostAffinityEnabled, this.standbyContainerManager)); this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); LOG.info("Finished container process manager initialization"); } @@ -423,7 +427,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host, if standby are not enabled // otherwise calling standbyContainerManager - containerManager.handleContainerLaunchFail(processorId, containerId, containerHost, containerAllocator); + if (processorId != null && standbyContainerManager.isPresent()) { + standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator); + } else if (processorId != null) { + LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}", + processorId, containerId, containerHost); + containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST); + } else { + LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " + + "Ignoring invalid/redundant notification.", containerId, containerHost); + } } /** @@ -591,25 +604,25 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback /** * Obtains the ID of the Samza processor pending launch on the provided resource (container). * - * ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com - * - * @param containerId last known id of the container deployed - * @return the logical processorId of the processor (e.g., 0, 1, 2 ...) + * @param resourceId the ID of the resource (container) + * @return the ID of the Samza processor on this resource */ - private String getPendingProcessorId(String containerId) { + private String getPendingProcessorId(String resourceId) { for (Map.Entry<String, SamzaResource> entry: state.pendingProcessors.entrySet()) { - if (entry.getValue().getContainerId().equals(containerId)) { - LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost()); + if (entry.getValue().getContainerId().equals(resourceId)) { + LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", resourceId, entry.getKey(), entry.getValue().getHost()); return entry.getKey(); } } return null; } - /** - * Request {@link ContainerManager#handleContainerStop} to determine next step of actions for the stopped container - */ - private void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) { - containerManager.handleContainerStop(processorId, containerId, preferredHost, exitStatus, preferredHostRetryDelay, containerAllocator); + private void handleContainerStop(String processorId, String resourceID, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) { + if (standbyContainerManager.isPresent()) { + standbyContainerManager.get().handleContainerStop(processorId, resourceID, preferredHost, exitStatus, containerAllocator, preferredHostRetryDelay); + } else { + // If StandbyTasks are not enabled, we simply make a request for the preferredHost + containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay); + } } } diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java index 649435a..99ef433 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java @@ -22,6 +22,7 @@ import org.apache.samza.config.Config; import java.lang.reflect.Field; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -29,8 +30,8 @@ public class MockContainerAllocatorWithHostAffinity extends ContainerAllocator { private Semaphore semaphore = new Semaphore(0); public MockContainerAllocatorWithHostAffinity(ClusterResourceManager manager, - Config config, SamzaApplicationState state, ContainerManager containerManager) { - super(manager, config, state, true, containerManager); + Config config, SamzaApplicationState state) { + super(manager, config, state, true, Optional.empty()); } /** diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java index 7448e57..0b3ff80 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java @@ -18,9 +18,11 @@ */ package org.apache.samza.clustermanager; +import java.util.Optional; import org.apache.samza.config.Config; import java.lang.reflect.Field; + import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -29,9 +31,10 @@ public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocato public int requestedContainers = 0; private Semaphore semaphore = new Semaphore(0); - public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager, - Config config, SamzaApplicationState state, ContainerManager containerManager) { - super(resourceManager, config, state, false, containerManager); + public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager manager, + Config config, + SamzaApplicationState state) { + super(manager, config, state, false, Optional.empty()); } /** diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java index 3451c4c..927df89 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -60,7 +61,6 @@ public class TestContainerAllocatorWithHostAffinity { private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager); private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); - private final ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); private JobModelManager initializeJobModelManager(Config config, int containerCount) { Map<String, Map<String, String>> localityMap = new HashMap<>(); @@ -83,8 +83,7 @@ public class TestContainerAllocatorWithHostAffinity { @Before public void setup() throws Exception { - containerAllocator = - new ContainerAllocator(clusterResourceManager, config, state, true, containerManager); + containerAllocator = new ContainerAllocator(clusterResourceManager, config, state, true, Optional.empty()); requestState = new MockContainerRequestState(clusterResourceManager, true); Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState"); requestStateField.setAccessible(true); @@ -360,8 +359,6 @@ public class TestContainerAllocatorWithHostAffinity { @Test public void testRequestAllocationOnPreferredHostWithRunStreamProcessor() throws Exception { ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class); - ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state); - ContainerManager containerManager = new ContainerManager(state, mockClusterResourceManager, false); // Mock the callback from ClusterManager to add resources to the allocator doAnswer((InvocationOnMock invocation) -> { SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0); @@ -370,7 +367,8 @@ public class TestContainerAllocatorWithHostAffinity { }).when(mockCPM).onResourcesAvailable(anyList()); spyAllocator = Mockito.spy( - new ContainerAllocator(mockClusterResourceManager, config, state, true, containerManager)); + new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, true, + Optional.empty())); // Request Resources spyAllocator.requestResources(new HashMap<String, String>() { @@ -407,9 +405,8 @@ public class TestContainerAllocatorWithHostAffinity { @Test public void testExpiredRequestAllocationOnAnyHost() throws Exception { MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state)); - ContainerManager spyContainerManager = spy(new ContainerManager(state, spyManager, false)); - spyAllocator = Mockito.spy( - new ContainerAllocator(spyManager, config, state, true, spyContainerManager)); + spyAllocator = Mockito.spy(new ContainerAllocator(spyManager, config, state, true, Optional.empty())); + // Request Preferred Resources spyAllocator.requestResources(new HashMap<String, String>() { { @@ -428,10 +425,10 @@ public class TestContainerAllocatorWithHostAffinity { // Verify that all the request that were created as preferred host requests expired assertTrue(state.preferredHostRequests.get() == 2); assertTrue(state.expiredPreferredHostRequests.get() == 2); - verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"), - any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class)); - verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"), - any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class)); + verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"), + any(SamzaResourceRequest.class)); + verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"), + any(SamzaResourceRequest.class)); // Verify that preferred host request were cancelled and since no surplus resources were available // requestResource was invoked with ANY_HOST requests @@ -449,11 +446,10 @@ public class TestContainerAllocatorWithHostAffinity { @Test public void testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws Exception { // Add Extra Resources - MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state)); - ContainerManager spyContainerManager = spy(new ContainerManager(state, spyClusterResourceManager, false)); - spyAllocator = Mockito.spy( - new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager)); + new ContainerAllocator(new MockClusterResourceManager(callback, state), config, state, true, + Optional.empty())); + spyAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1")); spyAllocator.addResource(new SamzaResource(1, 1000, "zzz", "id2")); @@ -473,11 +469,11 @@ public class TestContainerAllocatorWithHostAffinity { Thread.sleep(100); // Verify that all the request that were created as preferred host requests expired - assertEquals(state.expiredPreferredHostRequests.get(), 2); - verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"), - any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class)); - verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"), - any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class)); + assertTrue(state.expiredPreferredHostRequests.get() == 2); + verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"), + any(SamzaResourceRequest.class)); + verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"), + any(SamzaResourceRequest.class)); // Verify that runStreamProcessor was invoked with already available ANY_HOST requests ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class); diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java index 9b0d1b7..16eac0b 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java @@ -23,6 +23,7 @@ import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.JobModelManager; @@ -66,8 +67,7 @@ public class TestContainerAllocatorWithoutHostAffinity { @Before public void setup() throws Exception { - containerAllocator = new ContainerAllocator(manager, config, state, false, - new ContainerManager(state, manager, false)); + containerAllocator = new ContainerAllocator(manager, config, state, false, Optional.empty()); requestState = new MockContainerRequestState(manager, false); Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState"); requestStateField.setAccessible(true); @@ -261,10 +261,9 @@ public class TestContainerAllocatorWithoutHostAffinity { }; ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class); - ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state); - ContainerManager spyContainerManager = spy(new ContainerManager(state, mockManager, false)); spyAllocator = Mockito.spy( - new ContainerAllocator(mockManager, config, state, false, spyContainerManager)); + new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, false, + Optional.empty())); // Mock the callback from ClusterManager to add resources to the allocator doAnswer((InvocationOnMock invocation) -> { SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0); @@ -283,9 +282,9 @@ public class TestContainerAllocatorWithoutHostAffinity { resourceRequestCaptor.getAllValues() .forEach(resourceRequest -> assertEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST)); assertTrue(state.anyHostRequests.get() == containersToHostMapping.size()); - // Expiry currently should not be invoked for host affinity enabled cases only - verify(spyContainerManager, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(), - any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class)); + // Expiry currently is only handled for host affinity enabled cases + verify(spyAllocator, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(), + any(SamzaResourceRequest.class)); // Only updated when host affinity is enabled assertTrue(state.matchedResourceRequests.get() == 0); assertTrue(state.preferredHostRequests.get() == 0); diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 639b247..f100393 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -141,7 +141,7 @@ public class TestContainerProcessManager { SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); + ContainerProcessManager cpm = buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty()); @@ -165,8 +165,7 @@ public class TestContainerProcessManager { state, new MetricsRegistryMap(), clusterResourceManager, - Optional.empty(), - containerManager + Optional.empty() ); allocator = @@ -184,7 +183,6 @@ public class TestContainerProcessManager { MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf)); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); ContainerProcessManager cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); @@ -192,8 +190,7 @@ public class TestContainerProcessManager { MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, conf, - state, - containerManager); + state); getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator); CountDownLatch latch = new CountDownLatch(1); @@ -250,13 +247,11 @@ public class TestContainerProcessManager { MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf)); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, conf, - state, - containerManager); + state); ContainerProcessManager cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); @@ -296,13 +291,11 @@ public class TestContainerProcessManager { MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf)); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, conf, - state, - containerManager); + state); ContainerProcessManager cpm = spy( buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); @@ -393,13 +386,11 @@ public class TestContainerProcessManager { SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, clusterManagerConfig, - state, - containerManager); + state); ContainerProcessManager cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); @@ -477,13 +468,11 @@ public class TestContainerProcessManager { SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, clusterManagerConfig, - state, - containerManager); + state); ContainerProcessManager cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); @@ -585,13 +574,11 @@ public class TestContainerProcessManager { MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf)); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, conf, - state, - containerManager); + state); ContainerProcessManager cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); @@ -625,17 +612,15 @@ public class TestContainerProcessManager { configMap.putAll(getConfig()); MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, new MapConfig(config), - state, - containerManager); + state); ContainerProcessManager manager = - new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), clusterResourceManager, - Optional.of(allocator), containerManager); + new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), + clusterResourceManager, Optional.of(allocator)); manager.start(); SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1"); @@ -659,13 +644,11 @@ public class TestContainerProcessManager { "1", "host2"))); MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity( clusterResourceManager, cfg, - state, - containerManager); + state); ContainerProcessManager cpm = spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator))); @@ -722,13 +705,11 @@ public class TestContainerProcessManager { MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(conf))); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, conf, - state, - containerManager); + state); ContainerProcessManager cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); @@ -795,13 +776,11 @@ public class TestContainerProcessManager { MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(config))); - ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, conf, - state, - containerManager); + state); ContainerProcessManager cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); @@ -887,7 +866,7 @@ public class TestContainerProcessManager { private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state, ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) { - return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager, allocator, - new ContainerManager(state, clusterResourceManager, false)); + return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager, + allocator); } }
