lakshmi-manasa-g commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538891696



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -248,16 +253,23 @@ public void start() {
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    
state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) 
-> {
-      String host = 
Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    
state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) 
-> {
+      String host = 
Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its 
already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);

Review comment:
       1. wait, if we have to restart all containers then why are we asking 
allocator for resources for only some of the processors.. is it to avoid a 
scenario where we spin up a processor with Id 0 though there is a processor 
with same id from previous attempt leading to orphan container issues?
   
   2. would benefit from a log here too i feel.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -456,6 +482,18 @@ StartpointManager createStartpointManager() {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, 
containerPlacementMetadataStore, localityManager);
+    return new ContainerProcessManager(config, state, metrics, 
containerPlacementMetadataStore, localityManager,
+        metadataChangedAcrossAttempts);
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager createJobCoordinatorMetadataManager() {
+    return new JobCoordinatorMetadataManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore,
+        SetJobCoordinatorMetadataMessage.TYPE), 
JobCoordinatorMetadataManager.ClusterType.YARN, metrics);
+  }
+
+  @VisibleForTesting
+  boolean isMetadataChangedAcrossAttempts() {

Review comment:
       is this only for tests?

##########
File path: 
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, 
CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was 
instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned 
container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource 
isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", 
resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), 
container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = 
state.runningProcessors.get(resource.getContainerId());

Review comment:
       major: hmm.. some confusion here.. resource.getContainerId() gets the 
yarn container id right (of the form `container_1350670447861_0003_01_000001`). 
 see one ex [here 
](https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java#L488)
   
   But if we see code where the runningProcessors gets populated -- it has key 
as samza processorId (of the form 0) -- see [here 
](https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java#L738)
   
   but here you are using container id to fetch from runningProcessors.

##########
File path: 
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, 
CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was 
instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned 
container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource 
isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", 
resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), 
container.getNodeId());
+      } else {

Review comment:
       should we put this block behind the AM-HA config?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to