lakshmi-manasa-g commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538940474



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -260,8 +267,9 @@ public void run() {
       MetadataResourceUtil metadataResourceUtil = new 
MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled
-      if (new JobConfig(config).getStartpointEnabled()) {
+      // fan out the startpoints if startpoints is enabled and if the metadata 
changed across attempts.
+      // the metadata changed should be false and only get evaluated if job 
coordinator high availability is enabled.
+      if (new JobConfig(config).getStartpointEnabled() && 
!metadataChangedAcrossAttempts) {

Review comment:
       what if startpoint is enabled and metadata has changed? we just ignore? 
as in for AM-HA with metadata changes we dont create startpoint manager? if we 
are planning another PR for this then lets leave a comment saying tbd.
   
   sorry missed this in the first pass.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -248,16 +253,23 @@ public void start() {
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    
state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) 
-> {
-      String host = 
Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    
state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) 
-> {
+      String host = 
Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its 
already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);

Review comment:
       thanks for clarifying

##########
File path: 
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, 
CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was 
instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned 
container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource 
isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", 
resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), 
container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = 
state.runningProcessors.get(resource.getContainerId());
+        if (yarnContainer != null) {
+          log.info("Stopping container from previous attempt with Container 
ID: {} on host: {}",
+              resource.getContainerId(), resource.getHost());
+          amClient.releaseAssignedContainer(yarnContainer.id());

Review comment:
       awesome! thanks for telling me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to