This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1087683  SAMZA-2610: Handle Metadata changes for AM HA orchestration 
(#1450)
1087683 is described below

commit 108768393960a893f9d202edf50bf47d2b1af590
Author: mynameborat <[email protected]>
AuthorDate: Wed Dec 9 09:33:11 2020 -0800

    SAMZA-2610: Handle Metadata changes for AM HA orchestration (#1450)
    
    Description:
    AM performs planning and job model generation for every incarnation. With 
AM-HA, the new job model or configuration may invalidate the containers from 
the previous attempt. In order to ensure correctness, we handle this by 
detecting these changes and restart all the containers in case of any changes 
to metadata (job model or configuration).
    
    Changes:
    
    Detect changes in metadata by reading older metadata from coordinator 
stream and signal the CPM
    As part of resource request & orchestration, ignore the containers that are 
already running from the previous attempt and proceed to release them if 
metadata changed.
    Releasing the container will signal RM through AMRM client and RM will 
orchestrate killing the processing container. It is different from the normal 
StopStreamProcessor flow as the NMClient isn't the source of truth and doesn't 
have context about the containers spun in the previous attempts
---
 .../clustermanager/ClusterBasedJobCoordinator.java |  74 ++++++++++++--
 .../clustermanager/ContainerProcessManager.java    |  24 +++--
 .../coordinator/JobCoordinatorMetadataManager.java |  94 +++++++++++------
 .../TestClusterBasedJobCoordinator.java            |  80 ++++++++++++++-
 .../TestContainerPlacementActions.java             |  10 +-
 .../TestContainerProcessManager.java               |  76 ++++++++++++--
 .../TestJobCoordinatorMetadataManager.java         |  50 +++++++++-
 .../samza/job/yarn/YarnClusterResourceManager.java |  25 ++++-
 .../job/yarn/TestYarnClusterResourceManager.java   | 111 +++++++++++++--------
 9 files changed, 443 insertions(+), 101 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index b98c727..08bcfda 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
+import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.PartitionChangeException;
@@ -47,6 +48,8 @@ import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStrea
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import 
org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -64,7 +67,6 @@ import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Implements a JobCoordinator that is completely independent of the 
underlying cluster
  * manager system. This {@link ClusterBasedJobCoordinator} handles 
functionality common
@@ -170,6 +172,11 @@ public class ClusterBasedJobCoordinator {
    */
   private JmxServer jmxServer;
 
+  /*
+   * Denotes if the metadata changed across application attempts. Used only if 
job coordinator high availability is enabled
+   */
+  private boolean metadataChangedAcrossAttempts = false;
+
   /**
    * Variable to keep the callback exception
    */
@@ -208,11 +215,11 @@ public class ClusterBasedJobCoordinator {
     this.localityManager =
         new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetContainerHostMapping.TYPE));
 
-    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+    if (isApplicationMasterHighAvailabilityEnabled()) {
       ExecutionContainerIdManager executionContainerIdManager = new 
ExecutionContainerIdManager(
           new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetExecutionEnvContainerIdMapping.TYPE));
-
       
state.processorToExecutionId.putAll(executionContainerIdManager.readExecutionEnvironmentContainerIdMapping());
+      generateAndUpdateJobCoordinatorMetadata(jobModelManager.jobModel());
     }
     // build metastore for container placement messages
     containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(metadataStore);
@@ -260,8 +267,12 @@ public class ClusterBasedJobCoordinator {
       MetadataResourceUtil metadataResourceUtil = new 
MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled
-      if (new JobConfig(config).getStartpointEnabled()) {
+      /*
+       * We fanout startpoint if and only if
+       *  1. Startpoint is enabled in configuration
+       *  2. If AM HA is enabled, fanout only if startpoint enabled and job 
coordinator metadata changed
+       */
+      if (shouldFanoutStartpoint()) {
         StartpointManager startpointManager = createStartpointManager();
         startpointManager.start();
         try {
@@ -332,6 +343,24 @@ public class ClusterBasedJobCoordinator {
   }
 
   /**
+   * Generate the job coordinator metadata for current application attempt and 
checks for changes in the
+   * metadata from the previous attempt and writes the updates metadata to 
coordinator stream.
+   *
+   * @param jobModel job model used to generate the job coordinator metadata
+   */
+  @VisibleForTesting
+  void generateAndUpdateJobCoordinatorMetadata(JobModel jobModel) {
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = 
createJobCoordinatorMetadataManager();
+
+    JobCoordinatorMetadata previousMetadata = 
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    JobCoordinatorMetadata newMetadata = 
jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);
+    if (jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)) {
+      jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+      metadataChangedAcrossAttempts = true;
+    }
+  }
+
+  /**
    * Stops all components of the JobCoordinator.
    */
   private void onShutDown() {
@@ -456,6 +485,39 @@ public class ClusterBasedJobCoordinator {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, 
containerPlacementMetadataStore, localityManager);
+    return new ContainerProcessManager(config, state, metrics, 
containerPlacementMetadataStore, localityManager,
+        metadataChangedAcrossAttempts);
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager createJobCoordinatorMetadataManager() {
+    return new JobCoordinatorMetadataManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore,
+        SetJobCoordinatorMetadataMessage.TYPE), 
JobCoordinatorMetadataManager.ClusterType.YARN, metrics);
+  }
+
+  @VisibleForTesting
+  boolean isApplicationMasterHighAvailabilityEnabled() {
+    return new JobConfig(config).getApplicationMasterHighAvailabilityEnabled();
+  }
+
+  @VisibleForTesting
+  boolean isMetadataChangedAcrossAttempts() {
+    return metadataChangedAcrossAttempts;
+  }
+
+  /**
+   * We only fanout startpoint if and only if
+   *  1. Startpoint is enabled
+   *  2. If AM HA is enabled, fanout only if startpoint enabled and job 
coordinator metadata changed
+   *
+   * @return true if it satisfies above conditions, false otherwise
+   */
+  @VisibleForTesting
+  boolean shouldFanoutStartpoint() {
+    JobConfig jobConfig = new JobConfig(config);
+    boolean startpointEnabled = jobConfig.getStartpointEnabled();
+
+    return isApplicationMasterHighAvailabilityEnabled() ?
+        startpointEnabled && isMetadataChangedAcrossAttempts() : 
startpointEnabled;
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index ec52d4b..995cf7d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -131,12 +131,14 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
    */
   private final Map<String, ProcessorFailure> processorFailures = new 
HashMap<>();
 
+  private final boolean restartContainers;
+
   private ContainerProcessManagerMetrics containerProcessManagerMetrics;
   private JvmMetrics jvmMetrics;
   private Map<String, MetricsReporter> metricsReporters;
 
   public ContainerProcessManager(Config config, SamzaApplicationState state, 
MetricsRegistryMap registry,
-      ContainerPlacementMetadataStore metadataStore, LocalityManager 
localityManager) {
+      ContainerPlacementMetadataStore metadataStore, LocalityManager 
localityManager, boolean restartContainers) {
     Preconditions.checkNotNull(localityManager, "Locality manager cannot be 
null");
     this.state = state;
     this.clusterManagerConfig = new ClusterManagerConfig(config);
@@ -175,6 +177,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     this.containerAllocator = new 
ContainerAllocator(this.clusterResourceManager, config, state, 
hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
+    this.restartContainers = restartContainers;
     LOG.info("Finished container process manager initialization.");
   }
 
@@ -185,7 +188,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       ClusterResourceManager resourceManager,
       Optional<ContainerAllocator> allocator,
       ContainerManager containerManager,
-      LocalityManager localityManager) {
+      LocalityManager localityManager,
+      boolean restartContainers) {
     this.state = state;
     this.clusterManagerConfig = clusterManagerConfig;
     this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -200,6 +204,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       () -> new ContainerAllocator(this.clusterResourceManager, 
clusterManagerConfig, state,
           hostAffinityEnabled, this.containerManager));
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
+    this.restartContainers = restartContainers;
     LOG.info("Finished container process manager initialization");
   }
 
@@ -248,16 +253,23 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    
state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) 
-> {
-      String host = 
Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    
state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) 
-> {
+      String host = 
Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its 
already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);
+        }
+      });
     }
     containerAllocator.requestResources(processorToHost);
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
index c5d72f5..c4540a6 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -54,12 +54,18 @@ public class JobCoordinatorMetadataManager {
   private static final String JOB_COORDINATOR_MANAGER_METRICS = 
"job-coordinator-manager";
   private static final String JOB_MODEL_CHANGED = "jobModelChanged";
   private static final String CONFIG_CHANGED = "configChanged";
+  private static final String METADATA_GENERATION_FAILED_COUNT = 
"metadataGenerationFailedCount";
+  private static final String METADATA_READ_FAILED_COUNT = 
"metadataReadFailedCount";
+  private static final String METADATA_WRITE_FAILED_COUNT = 
"metadataWriteFailedCount";
   private static final String NEW_DEPLOYMENT = "newDeployment";
 
   static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
   static final String CONTAINER_ID_DELIMITER = "_";
 
   private final Counter applicationAttemptCount;
+  private final Counter metadataGenerationFailedCount;
+  private final Counter metadataReadFailedCount;
+  private final Counter metadataWriteFailedCount;
   private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
   private final Gauge<Integer> configChangedAcrossApplicationAttempt;
   private final Gauge<Integer> newDeployment;
@@ -68,17 +74,30 @@ public class JobCoordinatorMetadataManager {
   private final Serde<String> valueSerde;
   private final ClusterType clusterType;
 
-  public JobCoordinatorMetadataManager(MetadataStore metadataStore, 
ClusterType clusterType, MetricsRegistry metricsRegistry) {
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, 
ClusterType clusterType,
+      MetricsRegistry metricsRegistry) {
+    this(metadataStore, clusterType, metricsRegistry,
+        new 
CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE));
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType 
clusterType, MetricsRegistry metricsRegistry,
+      Serde<String> valueSerde) {
     Preconditions.checkNotNull(clusterType, "Cluster type cannot be null");
+
     this.clusterType = clusterType;
     this.metadataStore = metadataStore;
-    this.valueSerde = new 
CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE);
+    this.valueSerde = valueSerde;
 
     applicationAttemptCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, 
APPLICATION_ATTEMPT_COUNT);
     configChangedAcrossApplicationAttempt =
         metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
CONFIG_CHANGED, 0);
     jobModelChangedAcrossApplicationAttempt =
         metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
JOB_MODEL_CHANGED, 0);
+    metadataGenerationFailedCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS,
+        METADATA_GENERATION_FAILED_COUNT);
+    metadataReadFailedCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, 
METADATA_READ_FAILED_COUNT);
+    metadataWriteFailedCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, 
METADATA_WRITE_FAILED_COUNT);
     newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
NEW_DEPLOYMENT, 0);
   }
 
@@ -121,8 +140,9 @@ public class JobCoordinatorMetadataManager {
       return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), 
String.valueOf(configId),
           String.valueOf(jobModelId));
     } catch (Exception e) {
+      metadataGenerationFailedCount.inc();
       LOG.error("Failed to generate metadata for the current attempt due to ", 
e);
-      throw new RuntimeException("Failed to generate the metadata for the 
current attempt due to ", e);
+      throw new SamzaException("Failed to generate the metadata for the 
current attempt due to ", e);
     }
   }
 
@@ -179,6 +199,7 @@ public class JobCoordinatorMetadataManager {
           metadata = metadataMapper.readValue(metadataString, 
JobCoordinatorMetadata.class);
           break;
         } catch (Exception e) {
+          metadataReadFailedCount.inc();
           LOG.error("Failed to read job coordinator metadata due to ", e);
         }
       }
@@ -204,17 +225,59 @@ public class JobCoordinatorMetadataManager {
       metadataStore.put(clusterType.name(), 
valueSerde.toBytes(metadataValueString));
       LOG.info("Successfully written job coordinator metadata: {} for cluster 
{}.", metadata, clusterType);
     } catch (Exception e) {
+      metadataWriteFailedCount.inc();
       LOG.error("Failed to write the job coordinator metadata to metadata 
store due to ", e);
       throw new SamzaException("Failed to write the job coordinator 
metadata.", e);
     }
   }
 
+  /**
+   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
+   * way of generating this ID and we will need some contract between the 
underlying cluster manager and samza engine
+   * around what the epoch ID should be like and what is needed to generate is 
across different cluster offerings.
+   * Due to unknowns defined above, we leave it as is and keep it simple for 
now. It is favorable to keep it this way
+   * instead of introducing a loosely defined interface/API and marking it 
unstable.
+   *
+   * The properties of the epoch identifier are as follows
+   *  1. Unique across applications in the cluster
+   *  2. Remains unchanged within a single deployment lifecycle
+   *  3. Remains unchanged across application attempt within a single 
deployment lifecycle
+   *  4. Changes across deployment lifecycle
+   *
+   *  Note: The above properties is something we want keep intact when 
extracting this into a well defined interface
+   *  or contract for YARN AM HA to work.
+   *  The format and property used to generate ID is specific to YARN and the 
specific format of the container name
+   *  is a public contract by YARN which is likely to remain backward 
compatible.
+   *
+   * @return an identifier associated with the job coordinator satisfying the 
above properties
+   */
+  @VisibleForTesting
+  String fetchEpochIdForJobCoordinator() {
+    String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+    return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+  }
+
   @VisibleForTesting
   Counter getApplicationAttemptCount() {
     return applicationAttemptCount;
   }
 
   @VisibleForTesting
+  Counter getMetadataGenerationFailedCount() {
+    return metadataGenerationFailedCount;
+  }
+
+  @VisibleForTesting
+  Counter getMetadataReadFailedCount() {
+    return metadataReadFailedCount;
+  }
+
+  @VisibleForTesting
+  Counter getMetadataWriteFailedCount() {
+    return metadataWriteFailedCount;
+  }
+
+  @VisibleForTesting
   Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
     return jobModelChangedAcrossApplicationAttempt;
   }
@@ -235,31 +298,6 @@ public class JobCoordinatorMetadataManager {
   }
 
   /**
-   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
-   * way of generating this ID and we will need some contract between the 
underlying cluster manager and samza engine
-   * around what the epoch ID should be like and what is needed to generate is 
across different cluster offerings.
-   * Due to unknowns defined above, we leave it as is and keep it simple for 
now. It is favorable to keep it this way
-   * instead of introducing a loosely defined interface/API and marking it 
unstable.
-   *
-   * The properties of the epoch identifier are as follows
-   *  1. Unique across applications in the cluster
-   *  2. Remains unchanged within a single deployment lifecycle
-   *  3. Remains unchanged across application attempt within a single 
deployment lifecycle
-   *  4. Changes across deployment lifecycle
-   *
-   *  Note: The above properties is something we want keep intact when 
extracting this into a well defined interface
-   *  or contract for YARN AM HA to work.
-   *  The format and property used to generate ID is specific to YARN and the 
specific format of the container name
-   *  is a public contract by YARN which is likely to remain backward 
compatible.
-   *
-   * @return an identifier associated with the job coordinator satisfying the 
above properties
-   */
-  private String fetchEpochIdForJobCoordinator() {
-    String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
-    return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
-  }
-
-  /**
    * A helper class to generate hash for the {@link Config} based on with a 
subset of configuration.
    * The subset of configuration used are configurations that prefix match the 
allowed prefixes.
    */
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index caa1ffe..50a1ee1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -32,11 +32,14 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.execution.RemoteJobPlanner;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.system.MockSystemFactory;
@@ -47,7 +50,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.mockito.exceptions.base.MockitoException;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -58,6 +60,8 @@ import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -150,7 +154,7 @@ public class TestClusterBasedJobCoordinator {
     Config config = new MapConfig(configMap);
     MockitoException stopException = new MockitoException("Stop");
 
-    ClusterBasedJobCoordinator clusterCoordinator = 
Mockito.spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config));
+    ClusterBasedJobCoordinator clusterCoordinator = 
spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config));
     ContainerProcessManager mockContainerProcessManager = 
mock(ContainerProcessManager.class);
     doReturn(true).when(mockContainerProcessManager).shouldShutdown();
     StartpointManager mockStartpointManager = mock(StartpointManager.class);
@@ -174,6 +178,43 @@ public class TestClusterBasedJobCoordinator {
   }
 
   @Test
+  public void testVerifyShouldFanoutStartpointWithoutAMHA() {
+    Config jobConfig = new MapConfig(configMap);
+
+    
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+    ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+        
spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+    
when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(true);
+    assertTrue("Startpoint should fanout even if metadata changed",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+
+    
when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(false);
+    assertTrue("Startpoint should fanout even if metadata remains unchanged",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+  }
+
+  @Test
+  public void testVerifyShouldFanoutStartpointWithAMHA() {
+    Config jobConfig = new MapConfig(configMap);
+
+    
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+    ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+        
spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+    
when(clusterBasedJobCoordinator.isApplicationMasterHighAvailabilityEnabled()).thenReturn(true);
+
+    
when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(true);
+    assertTrue("Startpoint should fanout with change in metadata",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+
+    
when(clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts()).thenReturn(false);
+    assertFalse("Startpoint fan out shouldn't happen when metadata is 
unchanged",
+        clusterBasedJobCoordinator.shouldFanoutStartpoint());
+
+  }
+
+  @Test
   public void testToArgs() {
     ApplicationConfig appConfig = new ApplicationConfig(new 
MapConfig(ImmutableMap.of(
         JobConfig.JOB_NAME, "test1",
@@ -192,4 +233,39 @@ public class TestClusterBasedJobCoordinator {
     assertEquals(expected.size(), actual.size());
     assertTrue(actual.containsAll(expected));
   }
+
+  @Test
+  public void testGenerateAndUpdateJobCoordinatorMetadata() {
+    Config jobConfig = new MapConfig(configMap);
+    
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+    ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+        
spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+    JobCoordinatorMetadata previousMetadata = 
mock(JobCoordinatorMetadata.class);
+    JobCoordinatorMetadata newMetadata = mock(JobCoordinatorMetadata.class);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = 
mock(JobCoordinatorMetadataManager.class);
+    JobModel mockJobModel = mock(JobModel.class);
+
+    
when(jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
+    when(jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(any(), 
any())).thenReturn(newMetadata);
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)).thenReturn(false);
+    
when(clusterBasedJobCoordinator.createJobCoordinatorMetadataManager()).thenReturn(jobCoordinatorMetadataManager);
+
+    /*
+     * Verify if there are no changes to metadata, the metadata changed flag 
remains false and no interactions
+     * with job coordinator metadata manager
+     */
+    
clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+    assertFalse("JC metadata changed should remain unchanged",
+        clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
+    verify(jobCoordinatorMetadataManager, 
times(0)).writeJobCoordinatorMetadata(any());
+
+    /*
+     * Verify if there are changes to metadata, we persist the new metadata & 
update the metadata changed flag
+     */
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)).thenReturn(true);
+    
clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+    assertTrue("JC metadata changed should be true", 
clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
+    verify(jobCoordinatorMetadataManager, 
times(1)).writeJobCoordinatorMetadata(newMetadata);
+  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 53bd5b0..c781f4d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -150,7 +150,7 @@ public class TestContainerPlacementActions {
     containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
-            clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager);
+            clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager, false);
   }
 
   @After
@@ -176,7 +176,7 @@ public class TestContainerPlacementActions {
     containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, true, mockLocalityManager));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, mockLocalityManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, mockLocalityManager, false);
   }
 
   @Test(timeout = 10000)
@@ -556,7 +556,7 @@ public class TestContainerPlacementActions {
     containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager, false);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
@@ -674,7 +674,7 @@ public class TestContainerPlacementActions {
 
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), 
getConfigWithHostAffinityAndRetries(false, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithoutHostAffinity), containerManager, localityManager);
+        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithoutHostAffinity), containerManager, localityManager, 
false);
 
     // Mimic Cluster Manager returning any request
     doAnswer(new Answer<Void>() {
@@ -807,7 +807,7 @@ public class TestContainerPlacementActions {
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, 
config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), 
getConfigWithHostAffinityAndRetries(true, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
+        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithHostAffinity), containerManager, localityManager, 
false);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index d285f9e..bcbe53f 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -171,7 +171,8 @@ public class TestContainerProcessManager {
         clusterResourceManager,
         Optional.empty(),
         containerManager,
-        mockLocalityManager
+        mockLocalityManager,
+        false
     );
 
     allocator =
@@ -272,11 +273,63 @@ public class TestContainerProcessManager {
 
     // Verify only 1 was requested with allocator
     assertEquals(1, allocator.requestedContainers);
+    assertTrue("Ensure no processors were forcefully restarted", 
callback.resourceStatuses.isEmpty());
 
     cpm.stop();
   }
 
   @Test
+  public void testOnInitToForceRestartAMHighAvailability() throws Exception {
+    Map<String, String> configMap = new HashMap<>(configVals);
+    configMap.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");
+    Config conf = new MapConfig(configMap);
+    SamzaResource samzaResource = new SamzaResource(1, 1024, "host", "0");
+
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(2));
+    state.runningProcessors.put("0", samzaResource);
+
+    MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
+    ClusterResourceManager clusterResourceManager = spy(new 
MockClusterResourceManager(callback, state));
+    ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
+    ContainerManager containerManager =
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
+
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.empty(), true);
+
+    MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
+        clusterResourceManager,
+        conf,
+        state,
+        containerManager);
+
+    getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
+    CountDownLatch latch = new CountDownLatch(1);
+    getPrivateFieldFromCpm("allocatorThread", cpm).set(cpm, new Thread() {
+      public void run() {
+        isRunning = true;
+        latch.countDown();
+      }
+    });
+
+    cpm.start();
+
+    if (!latch.await(2, TimeUnit.SECONDS)) {
+      Assert.fail("timed out waiting for the latch to expire");
+    }
+
+    verify(clusterResourceManager, 
times(1)).stopStreamProcessor(samzaResource);
+    assertEquals("CPM should stop the running container", 1, 
callback.resourceStatuses.size());
+
+    SamzaResourceStatus actualResourceStatus = 
callback.resourceStatuses.get(0);
+    assertEquals("Container 0 should be stopped", "0", 
actualResourceStatus.getContainerId());
+    assertEquals("Container 0 should have exited with preempted status", 
SamzaResourceStatus.PREEMPTED,
+        actualResourceStatus.getExitCode());
+    cpm.stop();
+  }
+
+  @Test
   public void testOnShutdown() throws Exception {
     Config conf = getConfig();
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
@@ -560,7 +613,8 @@ public class TestContainerProcessManager {
         containerManager);
 
     ContainerProcessManager cpm =
-        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator), mockLocalityManager);
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator),
+            mockLocalityManager, false);
 
     // start triggers a request
     cpm.start();
@@ -715,7 +769,7 @@ public class TestContainerProcessManager {
 
     ContainerProcessManager manager =
         new ContainerProcessManager(new ClusterManagerConfig(config), state, 
new MetricsRegistryMap(), clusterResourceManager,
-            Optional.of(allocator), containerManager, mockLocalityManager);
+            Optional.of(allocator), containerManager, mockLocalityManager, 
false);
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -751,7 +805,8 @@ public class TestContainerProcessManager {
         containerManager);
 
     ContainerProcessManager cpm =
-        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, 
clusterResourceManager, Optional.of(allocator), mockLocalityManager));
+        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, 
clusterResourceManager,
+            Optional.of(allocator), mockLocalityManager, false));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -989,15 +1044,22 @@ public class TestContainerProcessManager {
   }
   private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator) {
+    return buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, allocator, false);
+  }
+
+  private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
+      ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator, boolean restartContainer) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new 
HashMap<>()));
-    return buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, allocator, mockLocalityManager);
+    return buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, allocator,
+        mockLocalityManager, restartContainer);
   }
 
   private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
-      ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator, LocalityManager localityManager) {
+      ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator, LocalityManager localityManager,
+      boolean restartContainers) {
     return new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(), clusterResourceManager,
         allocator, buildContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false, 
localityManager), localityManager);
+        clusterManagerConfig.getHostAffinityEnabled(), false, 
localityManager), localityManager, restartContainers);
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
index bd177cc..70e65a3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -29,6 +29,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
 import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.model.ContainerModel;
@@ -36,6 +37,7 @@ import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -44,7 +46,9 @@ import static 
org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAIN
 import static 
org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_PROPERTY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doThrow;
@@ -144,6 +148,21 @@ public class TestJobCoordinatorMetadataManager {
   }
 
   @Test
+  public void testGenerateJobCoordinatorMetadataFailed() {
+    doThrow(new RuntimeException("Failed to generate epoch id"))
+        .when(jobCoordinatorMetadataManager).fetchEpochIdForJobCoordinator();
+
+    try {
+      jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(new 
JobModel(OLD_CONFIG, containerModelMap), OLD_CONFIG);
+      fail("Expected generate job coordinator metadata to throw exception");
+    } catch (Exception e) {
+      assertTrue("Expecting SamzaException to be thrown", e instanceof 
SamzaException);
+      assertEquals("Metadata generation failed count should be 1", 1,
+          
jobCoordinatorMetadataManager.getMetadataGenerationFailedCount().getCount());
+    }
+  }
+
+  @Test
   public void testGenerateJobCoordinatorMetadataForRepeatability() {
     when(jobCoordinatorMetadataManager.getEnvProperty(CONTAINER_ID_PROPERTY))
         .thenReturn(OLD_CONTAINER_ID);
@@ -175,6 +194,24 @@ public class TestJobCoordinatorMetadataManager {
   }
 
   @Test
+  public void testReadJobCoordinatorMetadataFailed() {
+    JobCoordinatorMetadata jobCoordinatorMetadata =
+        new JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, 
NEW_JOB_MODEL_ID);
+    Serde<String> mockSerde = spy(new 
CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE));
+    doThrow(new RuntimeException("Failed to read coordinator stream"))
+        .when(mockSerde).fromBytes(any());
+
+    jobCoordinatorMetadataManager = spy(new 
JobCoordinatorMetadataManager(metadataStore,
+        ClusterType.YARN, new MetricsRegistryMap(), mockSerde));
+    
jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(jobCoordinatorMetadata);
+
+    JobCoordinatorMetadata actualMetadata = 
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    assertNull("Read failed should return null", actualMetadata);
+    assertEquals("Metadata read failed count should be 1", 1,
+        jobCoordinatorMetadataManager.getMetadataReadFailedCount().getCount());
+  }
+
+  @Test
   public void testReadWriteJobCoordinatorMetadata() {
     JobCoordinatorMetadata jobCoordinatorMetadata =
         new JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, 
NEW_JOB_MODEL_ID);
@@ -190,10 +227,17 @@ public class TestJobCoordinatorMetadataManager {
     jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(null);
   }
 
-  @Test (expected = SamzaException.class)
+  @Test
   public void testWriteJobCoordinatorMetadataBubblesException() {
-    doThrow(new RuntimeException("failed to write to coordinator stream"))
+    doThrow(new RuntimeException("Failed to write to coordinator stream"))
         .when(metadataStore).put(anyString(), any());
-    
jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(mock(JobCoordinatorMetadata.class));
+    try {
+      
jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(mock(JobCoordinatorMetadata.class));
+      fail("Expected write job coordinator metadata to throw exception");
+    } catch (Exception e) {
+      assertTrue("Expecting SamzaException to be thrown", e instanceof 
SamzaException);
+      assertEquals("Metadata write failed count should be 1", 1,
+          
jobCoordinatorMetadataManager.getMetadataWriteFailedCount().getCount());
+    }
   }
 }
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index d1c5437..fa784e0 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.yarn;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.util.Set;
 import org.apache.hadoop.fs.FileStatus;
@@ -331,9 +332,26 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      String containerId = resource.getContainerId();
+      String containerHost = resource.getHost();
+      /*
+       * 1. Stop the container through NMClient if the container was 
instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned 
container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource 
isn't present in the book keeping.
+       */
       if (container != null) {
-        log.info("Stopping Container ID: {} on host: {}", 
resource.getContainerId(), resource.getHost());
+        log.info("Stopping Container ID: {} on host: {}", containerId, 
containerHost);
         this.nmClientAsync.stopContainerAsync(container.getId(), 
container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = 
state.runningProcessors.get(getRunningProcessorId(containerId));
+        if (yarnContainer != null) {
+          log.info("Stopping container from previous attempt with Container 
ID: {} on host: {}",
+              containerId, containerHost);
+          amClient.releaseAssignedContainer(yarnContainer.id());
+        } else {
+          log.info("No container with Container ID: {} exists. Ignoring the 
stop request", containerId);
+        }
       }
     }
   }
@@ -746,4 +764,9 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
           "Ignoring notification.", containerId);
     }
   }
+
+  @VisibleForTesting
+  ConcurrentHashMap<SamzaResource, Container> getAllocatedResources() {
+    return allocatedResources;
+  }
 }
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 08b18e8..89929f7 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -46,6 +46,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -58,19 +59,33 @@ import static org.mockito.Mockito.*;
 
 public class TestYarnClusterResourceManager {
 
+  private YarnConfiguration yarnConfiguration;
+  private Config config;
+  private SamzaAppMasterMetrics metrics;
+  private AMRMClientAsync asyncClient;
+  private SamzaYarnAppMasterLifecycle lifecycle;
+  private SamzaYarnAppMasterService service;
+  private NMClientAsync asyncNMClient;
+  private ClusterResourceManager.Callback callback;
+  private YarnAppState yarnAppState;
+
+  @Before
+  public void setup() {
+    yarnConfiguration = mock(YarnConfiguration.class);
+    config = mock(Config.class);
+    metrics = mock(SamzaAppMasterMetrics.class);
+    asyncClient = mock(AMRMClientAsync.class);
+    lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+    service = mock(SamzaYarnAppMasterService.class);
+    asyncNMClient = mock(NMClientAsync.class);
+    callback = mock(ClusterResourceManager.Callback.class);
+    yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 
8081);
+  }
+
   @Test
   public void testErrorInStartContainerShouldUpdateState() {
     // create mocks
     final int samzaContainerId = 1;
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), 
"host", 8080, 8081);
-    SamzaYarnAppMasterLifecycle lifecycle = 
mock(SamzaYarnAppMasterLifecycle.class);
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
 
     // start the cluster manager
     YarnClusterResourceManager yarnClusterResourceManager =
@@ -94,16 +109,6 @@ public class TestYarnClusterResourceManager {
 
   @Test
   public void testAllocatedResourceExpiryForYarn() {
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), 
"host", 8080, 8081);
-    SamzaYarnAppMasterLifecycle lifecycle = 
mock(SamzaYarnAppMasterLifecycle.class);
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
-
     // start the cluster manager
     YarnClusterResourceManager yarnClusterResourceManager =
         new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, 
yarnAppState, lifecycle, service, metrics,
@@ -118,15 +123,7 @@ public class TestYarnClusterResourceManager {
   @Test
   public void testAMShutdownOnRMCallback() throws IOException, YarnException {
     // create mocks
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), 
"host", 8080, 8081);
     SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new 
SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), 
yarnAppState, asyncClient, false));
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
 
     // start the cluster manager
     YarnClusterResourceManager yarnClusterResourceManager =
@@ -146,15 +143,7 @@ public class TestYarnClusterResourceManager {
   @Test
   public void testAMShutdownThrowingExceptionOnRMCallback() throws 
IOException, YarnException {
     // create mocks
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    Config config = mock(Config.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
-    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), 
"host", 8080, 8081);
     SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new 
SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), 
yarnAppState, asyncClient, false));
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
 
     
doThrow(InvalidApplicationMasterRequestException.class).when(asyncClient).unregisterApplicationMaster(FinalApplicationStatus.FAILED,
 null, null);
 
@@ -174,18 +163,11 @@ public class TestYarnClusterResourceManager {
   }
 
   @Test
-  public void testAMHACallbackInvokedForPreviousAttemptContainers() throws 
IOException, YarnException {
+  public void testAMHACallbackInvokedForPreviousAttemptContainers() {
     String previousAttemptContainerId = "0";
     String previousAttemptYarnContainerId = 
"container_1607304997422_0008_02_000002";
     // create mocks
-    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
-    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
-    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
     YarnAppState yarnAppState = Mockito.spy(new YarnAppState(0, 
mock(ContainerId.class), "host", 8080, 8081));
-    SamzaYarnAppMasterLifecycle lifecycle = 
mock(SamzaYarnAppMasterLifecycle.class);
-    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
-    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
 
     ContainerId containerId = mock(ContainerId.class);
     when(containerId.toString()).thenReturn(previousAttemptYarnContainerId);
@@ -222,4 +204,47 @@ public class TestYarnClusterResourceManager {
     SamzaResource samzaResource = samzaResourceArgumentCaptor.getValue();
     assertEquals(previousAttemptYarnContainerId, 
samzaResource.getContainerId());
   }
+
+  @Test
+  public void testStopStreamProcessorForContainerFromPreviousAttempt() {
+    String containerId = "Yarn_Container_id_0";
+    String processorId = "Container_id_0";
+    YarnContainer runningYarnContainer = mock(YarnContainer.class);
+    ContainerId previousRunningContainerId = mock(ContainerId.class);
+    YarnAppState yarnAppState = Mockito.spy(new YarnAppState(0, 
mock(ContainerId.class), "host", 8080, 8081));
+
+    yarnAppState.runningProcessors.put(processorId, runningYarnContainer);
+    when(runningYarnContainer.id()).thenReturn(previousRunningContainerId);
+    when(previousRunningContainerId.toString()).thenReturn(containerId);
+
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, 
yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    SamzaResource containerResourceFromPreviousRun = mock(SamzaResource.class);
+    
when(containerResourceFromPreviousRun.getContainerId()).thenReturn(containerId);
+
+    
yarnClusterResourceManager.stopStreamProcessor(containerResourceFromPreviousRun);
+    verify(asyncClient, 
times(1)).releaseAssignedContainer(previousRunningContainerId);
+  }
+
+  @Test
+  public void testStopStreamProcessorForContainerStartedInCurrentLifecycle() {
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, 
yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    SamzaResource allocatedContainerResource = mock(SamzaResource.class);
+    Container runningContainer = mock(Container.class);
+    ContainerId runningContainerId = mock(ContainerId.class);
+    NodeId runningNodeId = mock(NodeId.class);
+
+    when(runningContainer.getId()).thenReturn(runningContainerId);
+    when(runningContainer.getNodeId()).thenReturn(runningNodeId);
+
+    
yarnClusterResourceManager.getAllocatedResources().put(allocatedContainerResource,
 runningContainer);
+    yarnClusterResourceManager.stopStreamProcessor(allocatedContainerResource);
+
+    verify(asyncNMClient, times(1)).stopContainerAsync(runningContainerId, 
runningNodeId);
+  }
 }
\ No newline at end of file

Reply via email to