This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9eadad1 SAMZA-2511 : Adding logic to handle container stop fail
(#1347)
9eadad1 is described below
commit 9eadad1a4f8f14619bb31c1fda526f656b4f5751
Author: rmatharu <[email protected]>
AuthorDate: Thu Apr 16 10:01:28 2020 -0700
SAMZA-2511 : Adding logic to handle container stop fail (#1347)
* Adding logic to handle container stop fail
* Fixing test
* Adding cancel resource request
* Adding status for stop-failed
* Adding comments
Co-authored-by: Ray Manpreet Singh Matharu
<[email protected]>
---
.../clustermanager/ClusterResourceManager.java | 7 ++++
.../samza/clustermanager/ContainerManager.java | 29 ++++++++++++++++
.../clustermanager/ContainerProcessManager.java | 40 ++++++++++++++++------
.../clustermanager/StandbyContainerManager.java | 25 ++++++++++++++
.../placement/ContainerPlacementMetadata.java | 2 +-
.../MockClusterResourceManagerCallback.java | 5 +++
.../samza/job/yarn/YarnClusterResourceManager.java | 3 ++
7 files changed, 100 insertions(+), 11 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 8ea3c30..43b3d36 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -180,6 +180,13 @@ public abstract class ClusterResourceManager {
*/
void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t);
+ /**
+ * Callback invoked when there is a failure in stopping a processor on the
provided {@link SamzaResource}.
+ * @param resource the resource on which the processor was running
+ * @param t the error in stopping the processor
+ */
+ void onStreamProcessorStopFailure(SamzaResource resource, Throwable t);
+
/***
* This callback is invoked when there is an error in the
ClusterResourceManager. This is
* guaranteed to be invoked when there is an uncaught exception in any
other
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index b9427b3..70a050c 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -134,6 +134,12 @@ public class ContainerManager {
} else if (actionStatus ==
ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS) {
LOG.info("Waiting for running container to shutdown due to existing
ContainerPlacement action {}", actionMetaData);
return false;
+ } else if (actionStatus ==
ContainerPlacementMetadata.ContainerStatus.STOP_FAILED) {
+ LOG.info("Shutdown on running container failed for action {}",
actionMetaData);
+ markContainerPlacementActionFailed(actionMetaData,
+ String.format("failed to stop container on current host %s",
actionMetaData.getSourceHost()));
+ resourceRequestState.cancelResourceRequest(request);
+ return true;
} else if (actionStatus ==
ContainerPlacementMetadata.ContainerStatus.STOPPED) {
// If the job has standby containers enabled, always check standby
constraints before issuing a start on container
// Note: Always check constraints against allocated resource, since
preferred host can be ANY_HOST as well
@@ -234,6 +240,29 @@ public class ContainerManager {
}
/**
+ * Handle the container stop failure for active containers and standby (if
enabled).
+ * @param processorId logical id of the container eg 1,2,3
+ * @param containerId last known id of the container deployed
+ * @param containerHost host on which container is requested to be deployed
+ * @param containerAllocator allocator for requesting resources
+ * TODO: SAMZA-2512 Add integ test for handleContainerStopFail
+ */
+ void handleContainerStopFail(String processorId, String containerId, String
containerHost,
+ ContainerAllocator containerAllocator) {
+ if (processorId != null && hasActiveContainerPlacementAction(processorId))
{
+ // Assuming resource acquired on destination host will be relinquished
by the containerAllocator,
+ // We mark the placement action as failed, and return.
+ ContainerPlacementMetadata metaData =
getPlacementActionMetadata(processorId).get();
+
metaData.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOP_FAILED);
+ } else if (processorId != null && standbyContainerManager.isPresent()) {
+ standbyContainerManager.get().handleContainerStopFail(processorId,
containerId, containerAllocator);
+ } else {
+ LOG.warn("Did not find a running Processor ID for Container ID: {} on
host: {}. "
+ + "Ignoring invalid/redundant notification.", containerId,
containerHost);
+ }
+ }
+
+ /**
* Handles the state update on successful launch of a container, if this
launch is due to a container placement action updates the
* related metadata to report success
*
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index d3962ab..c54918d 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
@@ -299,17 +300,10 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
*/
public void onResourceCompleted(SamzaResourceStatus resourceStatus) {
String containerId = resourceStatus.getContainerId();
- String processorId = null;
- String hostName = null;
- for (Map.Entry<String, SamzaResource> entry:
state.runningProcessors.entrySet()) {
- if
(entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) {
- LOG.info("Container ID: {} matched running Processor ID: {} on host:
{}", containerId, entry.getKey(), entry.getValue().getHost());
+ Pair<String, String> runningProcessorIdHostname =
getRunningProcessor(containerId);
+ String processorId = runningProcessorIdHostname.getKey();
+ String hostName = runningProcessorIdHostname.getValue();
- processorId = entry.getKey();
- hostName = entry.getValue().getHost();
- break;
- }
- }
if (processorId == null) {
LOG.info("No running Processor ID found for Container ID: {} with
Status: {}. Ignoring redundant notification.", containerId,
resourceStatus.toString());
state.redundantNotifications.incrementAndGet();
@@ -431,6 +425,18 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
containerManager.handleContainerLaunchFail(processorId, containerId,
containerHost, containerAllocator);
}
+ @Override
+ public void onStreamProcessorStopFailure(SamzaResource resource, Throwable
t) {
+ String containerId = resource.getContainerId();
+ String containerHost = resource.getHost();
+ String processorId = getRunningProcessor(containerId).getKey();
+ LOG.warn("Stop failed for running Processor ID: {} on Container ID: {} on
host: {} with exception: {}",
+ processorId, containerId, containerHost, t);
+
+ // Notify container-manager of the failed container-stop request
+ containerManager.handleContainerStopFail(processorId, containerId,
containerHost, containerAllocator);
+ }
+
/**
* An error in the callback terminates the JobCoordinator
* @param e the underlying exception/error
@@ -623,6 +629,20 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
return null;
}
+ private Pair<String, String> getRunningProcessor(String containerId) {
+ for (Map.Entry<String, SamzaResource> entry:
state.runningProcessors.entrySet()) {
+ if (entry.getValue().getContainerId().equals(containerId)) {
+ LOG.info("Container ID: {} matched running Processor ID: {} on host:
{}", containerId, entry.getKey(), entry.getValue().getHost());
+
+ String processorId = entry.getKey();
+ String hostName = entry.getValue().getHost();
+ return new ImmutablePair<>(processorId, hostName);
+ }
+ }
+
+ return new ImmutablePair<>(null, null);
+ }
+
/**
* Request {@link ContainerManager#handleContainerStop} to determine next
step of actions for the stopped container
*/
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index a9d298d..10d5f09 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -127,6 +127,31 @@ public class StandbyContainerManager {
}
/**
+ * Handle the failed stop for a container, based on
+ * Case 1. If it is standby container, continue the failover
+ * Case 2. If it is an active container, then this is in invalid state and
throw an exception to alarm/restart.
+ * @param containerID the ID (e.g., 0, 1, 2) of the container that has failed
+ * @param resourceID id of the resource used for the failed container
+ */
+ public void handleContainerStopFail(String containerID, String resourceID,
+ ContainerAllocator containerAllocator) {
+ if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+ log.info("Handling stop fail for standby-container {}, continuing the
failover (if present)", containerID);
+
+ // if this standbyContainerResource was stopped for a failover, we will
find a metadata entry
+ Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata =
this.checkIfUsedForFailover(resourceID);
+
+ // if we find a metadata entry, we continue with the failover (select
another standby or any-host appropriately)
+ failoverMetadata.ifPresent(
+ metadata ->
initiateStandbyAwareAllocation(metadata.activeContainerID,
metadata.activeContainerResourceID,
+ containerAllocator));
+ } else {
+ // If this class receives a callback for stop-fail on an active
container, throw an exception
+ throw new SamzaException("Invalid State. Received stop container fail
for container Id: " + containerID);
+ }
+ }
+
+ /**
* If a standby container has stopped, then there are two possible cases
* Case 1. during a failover, the standby container was stopped for an
active's start, then we
* 1. request a resource on the standby's host to place the
activeContainer, and
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
index 15c9e1c..0f415d6 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
@@ -40,7 +40,7 @@ public class ContainerPlacementMetadata {
/**
* State to track container failover
*/
- public enum ContainerStatus { RUNNING, STOP_IN_PROGRESS, STOPPED }
+ public enum ContainerStatus { RUNNING, STOP_IN_PROGRESS, STOP_FAILED,
STOPPED }
// Container Placement request message
private final ContainerPlacementRequestMessage requestMessage;
// Host where the container is actively running
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
index 4e6e2c9..f3ab1d2 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
@@ -48,6 +48,11 @@ public class MockClusterResourceManagerCallback implements
ClusterResourceManage
}
@Override
+ public void onStreamProcessorStopFailure(SamzaResource resource, Throwable
t) {
+ // no op
+ }
+
+ @Override
public void onError(Throwable e) {
error = e;
}
diff --git
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index e05b31e..5459a9d 100644
---
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -570,6 +570,9 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
if (processorId != null) {
log.info("Got stop error notification for Container ID: {} for Processor
ID: {}", containerId, processorId, t);
YarnContainer container = state.runningProcessors.get(processorId);
+ SamzaResource resource = new
SamzaResource(container.resource().getVirtualCores(),
+ container.resource().getMemory(), container.nodeId().getHost(),
containerId.toString());
+ clusterManagerCallback.onStreamProcessorStopFailure(resource, t);
} else {
log.warn("Did not find the running Processor ID for the stop error
notification for Container ID: {}. " +
"Ignoring notification", containerId);