This is an automated email from the ASF dual-hosted git repository. jagadish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 8736797 Improved standby-aware container allocation for active-containers on job redeploys 8736797 is described below commit 87367975c693c4a06f5e56332374c7f95d39eb29 Author: Ray Matharu <rmath...@linkedin.com> AuthorDate: Tue Mar 26 19:11:33 2019 -0700 Improved standby-aware container allocation for active-containers on job redeploys Author: Ray Matharu <rmath...@linkedin.com> Reviewers: Jagadish<jagad...@apache.org> Closes #952 from rmatharu/test-standbyimprovements --- .../clustermanager/StandbyContainerManager.java | 236 ++++++++++++--------- .../samza/job/yarn/YarnClusterResourceManager.java | 5 - 2 files changed, 133 insertions(+), 108 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java index db27238..f240757 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java @@ -25,8 +25,9 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.model.JobModel; -import org.apache.samza.storage.kv.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,10 +92,10 @@ public class StandbyContainerManager { case SamzaResourceStatus.DISK_FAIL: case SamzaResourceStatus.ABORTED: case SamzaResourceStatus.PREEMPTED: - initiateActiveContainerFailover(containerID, resourceID, containerAllocator); + initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator); break; - // in all other cases, request-resource for the failed container, but record the resource-request, so that - // if this request expires, we can do a failover -- select a standby to stop & start the active on standby's host + // in all other cases, request-resource for the failed container, but record the resource-request, so that + // if this request expires, we can do a failover -- select a standby to stop & start the active on standby's host default: log.info("Requesting resource for active-container {} on host {}", containerID, preferredHost); SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(containerID, preferredHost); @@ -119,7 +120,7 @@ public class StandbyContainerManager { log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID); containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); } else { - initiateActiveContainerFailover(containerID, resourceID, containerAllocator); + initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator); } } @@ -147,7 +148,8 @@ public class StandbyContainerManager { activeContainerID, standbyContainerHostname, standbyContainerID); // request standbycontainer's host for active-container - SamzaResourceRequest resourceRequestForActive = containerAllocator.getResourceRequest(activeContainerID, standbyContainerHostname); + SamzaResourceRequest resourceRequestForActive = + containerAllocator.getResourceRequest(activeContainerID, standbyContainerHostname); // record the resource request, before issuing it to avoid race with allocation-thread failoverMetadata.get().recordResourceRequest(resourceRequestForActive); containerAllocator.issueResourceRequest(resourceRequestForActive); @@ -161,60 +163,90 @@ public class StandbyContainerManager { } } - /** Method to handle failover for an active container. - * We try to find a standby for the active container, and issue a stop on it. - * If we do not find a standby container, we simply issue an anyhost request to place it. + /** Method to handle standby-aware allocation for an active container. + * We try to find a standby host for the active container, and issue a stop on any standby-containers running on it, + * request resource to place the active on the standby's host, and one to place the standby elsewhere. * - * @param containerID the samzaContainerID of the active-container + * @param activeContainerID the samzaContainerID of the active-container * @param resourceID the samza-resource-ID of the container when it failed (used to index failover-state) */ - private void initiateActiveContainerFailover(String containerID, String resourceID, + private void initiateStandbyAwareAllocation(String activeContainerID, String resourceID, AbstractContainerAllocator containerAllocator) { - Optional<Entry<String, SamzaResource>> standbyContainer = this.selectStandby(containerID, resourceID); + String standbyHost = this.selectStandbyHost(activeContainerID, resourceID); - // If we find a standbyContainer, we initiate a failover - if (standbyContainer.isPresent()) { - - String standbyContainerId = standbyContainer.get().getKey(); - SamzaResource standbyResource = standbyContainer.get().getValue(); - String standbyResourceID = standbyResource.getResourceID(); - String standbyHost = standbyResource.getHost(); - - // update the state - FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(containerID, resourceID); - failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost); - - log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, " - + "for active container {}", standbyContainerId, standbyResourceID, containerID); - samzaApplicationState.failoversToStandby.incrementAndGet(); - this.clusterResourceManager.stopStreamProcessor(standbyResource); + // if the standbyHost returned is anyhost, we proceed with the request directly + if (standbyHost.equals(ResourceRequestState.ANY_HOST)) { + log.info("No standby container found for active container {}, making a resource-request for placing {} on {}, active's resourceID: {}", + activeContainerID, activeContainerID, ResourceRequestState.ANY_HOST, resourceID); + samzaApplicationState.failoversToAnyHost.incrementAndGet(); + containerAllocator.requestResource(activeContainerID, ResourceRequestState.ANY_HOST); } else { - // If we dont find a standbyContainer, we proceed with the ANYHOST request - log.info("No standby container found for active container {}, making a request for {}", containerID, - ResourceRequestState.ANY_HOST); - samzaApplicationState.failoversToAnyHost.incrementAndGet(); - containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); + // Check if there is a running standby-container on that host that needs to be stopped + List<String> standbySamzaContainerIds = this.standbyContainerConstraints.get(activeContainerID); + + Map<String, SamzaResource> runningStandbyContainersOnHost = new HashMap<>(); + this.samzaApplicationState.runningContainers.forEach((samzaContainerId, samzaResource) -> { + if (standbySamzaContainerIds.contains(samzaContainerId) && samzaResource.getHost().equals(standbyHost)) { + runningStandbyContainersOnHost.put(samzaContainerId, samzaResource); + } + }); + + if (runningStandbyContainersOnHost.isEmpty()) { + // if there are no running standby-containers on the standbyHost, we proceed to directly make a resource request + + log.info("No running standby container to stop on host {}, making a resource-request for placing {} on {}, active's resourceID: {}", + standbyHost, activeContainerID, standbyHost, resourceID); + FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID); + + // record the resource request, before issuing it to avoid race with allocation-thread + SamzaResourceRequest resourceRequestForActive = + containerAllocator.getResourceRequest(activeContainerID, standbyHost); + failoverMetadata.recordResourceRequest(resourceRequestForActive); + containerAllocator.issueResourceRequest(resourceRequestForActive); + samzaApplicationState.failoversToStandby.incrementAndGet(); + } else { + // if there is a running standby-container on the standbyHost, we issue a stop (the stopComplete callback completes the remainder of the flow) + FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID); + + runningStandbyContainersOnHost.forEach((standbyContainerID, standbyResource) -> { + log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, " + + "for active container {}", runningStandbyContainersOnHost.keySet(), + runningStandbyContainersOnHost.values(), activeContainerID); + failoverMetadata.updateStandbyContainer(standbyResource.getResourceID(), standbyResource.getHost()); + samzaApplicationState.failoversToStandby.incrementAndGet(); + this.clusterResourceManager.stopStreamProcessor(standbyResource); + }); + + // if multiple standbys are on the same host, we are in an invalid state, so we fail the deploy and retry + if (runningStandbyContainersOnHost.size() > 1) { + throw new SamzaException( + "Invalid State. Multiple standby containers found running on one host:" + runningStandbyContainersOnHost); + } + } } } /** - * Method to select a standby container for a given active container that has stopped. - * TODO: enrich this method to select standby's intelligently based on lag, timestamp, load-balencing, etc. + * Method to select a standby host for a given active container. + * 1. We first try to select a host which has a running standby-container, that we haven't already selected for failover. + * 2. If we dont any such host, we iterate over last-known standbyHosts, if we haven't already selected it for failover. + * 3. If still dont find a host, we fall back to AnyHost. + * + * See https://issues.apache.org/jira/browse/SAMZA-2140 + * * @param activeContainerID Samza containerID of the active container * @param activeContainerResourceID ResourceID of the active container at the time of its last failure - * @return + * @return standby host for the active container (if found), Any-host otherwise. */ - private Optional<Entry<String, SamzaResource>> selectStandby(String activeContainerID, - String activeContainerResourceID) { + private String selectStandbyHost(String activeContainerID, String activeContainerResourceID) { - log.info("Standby containers {} for active container {}", this.standbyContainerConstraints.get(activeContainerID), activeContainerID); + log.info("Standby containers {} for active container {} (resourceID {})", this.standbyContainerConstraints.get(activeContainerID), activeContainerID, activeContainerResourceID); // obtain any existing failover metadata - Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = - activeContainerResourceID == null ? Optional.empty() : this.getFailoverMetadata(activeContainerResourceID); + Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(activeContainerResourceID); // Iterate over the list of running standby containers, to find a standby resource that we have not already // used for a failover for this active resoruce @@ -224,17 +256,39 @@ public class StandbyContainerManager { SamzaResource standbyContainerResource = samzaApplicationState.runningContainers.get(standbyContainerID); // use this standby if there was no previous failover for which this standbyResource was used - if (!(failoverMetadata.isPresent() && failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID()))) { + if (!(failoverMetadata.isPresent() && failoverMetadata.get() + .isStandbyResourceUsed(standbyContainerResource.getResourceID()))) { - log.info("Returning standby container {} in running state for active container {}", standbyContainerID, - activeContainerID); - return Optional.of(new Entry<>(standbyContainerID, standbyContainerResource)); + log.info("Returning standby container {} in running state on host {} for active container {}", + standbyContainerID, standbyContainerResource.getHost(), activeContainerID); + return standbyContainerResource.getHost(); } } } - log.info("Did not find any running standby container for active container {}", activeContainerID); - return Optional.empty(); + + // We iterate over the list of last-known standbyHosts to check if anyone of them has not already been tried + for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) { + + String standbyHost = this.samzaApplicationState.jobModelManager.jobModel(). + getContainerToHostValue(standbyContainerID, SetContainerHostMapping.HOST_KEY); + + if (standbyHost == null || standbyHost.isEmpty()) { + log.info("No last known standbyHost for container {}", standbyContainerID); + + } else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) { + + log.info("Not using standby host {} for active container {} because it had already been selected", standbyHost, + activeContainerID); + } else { + // standbyHost is valid and has not already been selected + log.info("Returning standby host {} for active container {}", standbyHost, activeContainerID); + return standbyHost; + } + } + + log.info("Did not find any standby host for active container {}, returning any-host", activeContainerID); + return ResourceRequestState.ANY_HOST; } /** @@ -321,11 +375,13 @@ public class StandbyContainerManager { if (checkStandbyConstraints(request, samzaResource)) { // This resource can be used to launch this container - log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID, samzaResource.getHost(), preferredHost); + log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID, + samzaResource.getHost(), preferredHost); containerAllocator.runStreamProcessor(request, preferredHost); } else if (StandbyTaskUtil.isStandbyContainer(containerID)) { // This resource cannot be used to launch this standby container, so we make a new anyhost request - log.info("Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", + log.info( + "Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", containerID, samzaResource.getHost()); resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); resourceRequestState.cancelResourceRequest(request); @@ -333,24 +389,16 @@ public class StandbyContainerManager { samzaApplicationState.failedStandbyAllocations.incrementAndGet(); } else { // This resource cannot be used to launch this active container container, so we initiate a failover - log.warn("Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource", + log.warn( + "Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource", containerID, samzaResource.getHost()); resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); resourceRequestState.cancelResourceRequest(request); Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request); - - // if this active-container has never failed, then simple request anyhost - if (!failoverMetadata.isPresent()) { - log.info("Requesting ANY_HOST for active container {}", containerID); - containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); - } else { - log.info("Initiating failover for active container {}", containerID); - // we use the activeContainer's last resourceID to initiate the failover - String lastKnownResourceID = failoverMetadata.get().activeContainerResourceID; - initiateActiveContainerFailover(containerID, lastKnownResourceID, containerAllocator); - } - + String lastKnownResourceID = + failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID; + initiateStandbyAwareAllocation(containerID, lastKnownResourceID, containerAllocator); samzaApplicationState.failedStandbyAllocations.incrementAndGet(); } } @@ -368,9 +416,10 @@ public class StandbyContainerManager { ResourceRequestState resourceRequestState) { if (StandbyTaskUtil.isStandbyContainer(containerID)) { - handleExpiredRequestForStandbyContainer(containerID, request, alternativeResource, containerAllocator, resourceRequestState); + handleExpiredRequestForStandbyContainer(containerID, request, alternativeResource, containerAllocator, + resourceRequestState); } else { - handleExpiredRequestForActiveContainer(containerID, request, alternativeResource, containerAllocator, resourceRequestState); + handleExpiredRequestForActiveContainer(containerID, request, containerAllocator, resourceRequestState); } } @@ -382,10 +431,10 @@ public class StandbyContainerManager { if (alternativeResource.isPresent()) { // A standby container can be started on the anyhost-alternative-resource rightaway provided it passes all the // standby constraints - log.info("Handling expired request, standby container {} can be started on alternative resource {}", containerID, alternativeResource.get()); + log.info("Handling expired request, standby container {} can be started on alternative resource {}", containerID, + alternativeResource.get()); checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(), containerAllocator, resourceRequestState); - } else { // If there is no alternative-resource for the standby container we make a new anyhost request log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID); @@ -396,45 +445,20 @@ public class StandbyContainerManager { // Handle an expired resource request that was made for placing an active container private void handleExpiredRequestForActiveContainer(String containerID, SamzaResourceRequest request, - Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator, - ResourceRequestState resourceRequestState) { + AbstractContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) { + log.info("Handling expired request for active container {}", containerID); Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request); - - // An active container can be started on the alternative-any-host resource rightaway, if it has no prior failure, - // that is, there is no failoverMetadata associated with this resource-request - if (alternativeResource.isPresent() && !failoverMetadata.isPresent()) { - - log.info("Handling expired request, trying to run active container {} on alternative resource {}", containerID, alternativeResource.get()); - - checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(), - containerAllocator, resourceRequestState); - - } else if (!failoverMetadata.isPresent() && !alternativeResource.isPresent()) { - // An active container has no prior failure, and there is no-alternative-anyhost resource, so we make a new anyhost request - log.info("Handling expired request, requesting anyHost resource for active container {} because this active container has never failed", containerID); - - resourceRequestState.cancelResourceRequest(request); - containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); - - } else if (failoverMetadata.isPresent()) { - // An active container that had failed, and whose subsequent resource request has expired, needs to be failed over to - // a new standby-candidate, so we initiate a failover - - log.info("Handling expired request, initiating failover for active container {}", containerID); - - resourceRequestState.cancelResourceRequest(request); - - // we use the activeContainer's resourceID to initiate the failover - String lastKnownResourceID = failoverMetadata.get().activeContainerResourceID; - initiateActiveContainerFailover(containerID, lastKnownResourceID, containerAllocator); - - } else { - log.error("Handling expired request, invalid state containerID {}, resource request {}", containerID, request); - } + resourceRequestState.cancelResourceRequest(request); + + // we use the activeContainer's resourceID to initiate the failover and index failover-state + // if there is no prior failure for this active-Container, we use "unknown" + String lastKnownResourceID = + failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID; + log.info("Handling expired request for active container {}, lastKnownResourceID is {}", containerID, lastKnownResourceID); + initiateStandbyAwareAllocation(containerID, lastKnownResourceID, containerAllocator); } - /** * Check if a activeContainerResource has failover-metadata associated with it */ @@ -455,7 +479,6 @@ public class StandbyContainerManager { return Optional.empty(); } - @Override public String toString() { return this.failovers.toString(); @@ -501,11 +524,18 @@ public class StandbyContainerManager { this.resourceRequests.add(samzaResourceRequest); } - // Check if this resource request is used for this failover + // Check if this resource request has been issued in this failover public synchronized boolean containsResourceRequest(SamzaResourceRequest samzaResourceRequest) { return this.resourceRequests.contains(samzaResourceRequest); } + // Check if this standby-host has been used in this failover, that is, if this host matches the host of a selected-standby, + // or if we have made a resourceRequest for this host + public boolean isStandbyHostUsed(String standbyHost) { + return this.selectedStandbyContainers.values().contains(standbyHost) + || this.resourceRequests.stream().filter(request -> request.getPreferredHost().equals(standbyHost)).count() > 0; + } + @Override public String toString() { return "[activeContainerID: " + this.activeContainerID + " activeContainerResourceID: " diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 6d0fdb1..3a66573 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -563,11 +563,6 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement if (samzaContainerId != null) { YarnContainer container = state.runningYarnContainers.get(samzaContainerId); log.info("Failed Stop on Yarn Container: {} had Samza ContainerId: {} ", containerId.toString(), samzaContainerId); - SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(), - container.resource().getMemory(), container.nodeId().getHost(), containerId.toString()); - - log.info("Re-invoking stop stream processor for container: {}", containerId); - this.stopStreamProcessor(resource);// For now, we retry the stopping of the container } else { log.info("Got an invalid notification for container: {}", containerId.toString()); }