weiqingy commented on a change in pull request #1197:
URL: https://github.com/apache/samza/pull/1197#discussion_r588589114



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -184,23 +186,38 @@ public ContainerProcessManager(Config config, 
SamzaApplicationState state, Metri
 
     this.clusterResourceManager = resourceManager;
     this.standbyContainerManager = Optional.empty();
+
     this.diagnosticsManager = Option.empty();
     this.containerAllocator = allocator.orElseGet(
         () -> new ContainerAllocator(this.clusterResourceManager, 
clusterManagerConfig, state,
             hostAffinityEnabled, this.standbyContainerManager));
-    this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
+    if (shouldStartAllocateThread()) {
+      this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
+    }
     LOG.info("Finished container process manager initialization");
   }
 
+  // In Kubernetes, the pod will be started by kubelet automatically once it 
is allocated, it does not need a
+  // separate thread to keep polling the allocated resources to start the 
container.
+  public boolean shouldStartAllocateThread() {
+    return 
!clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
+  }
+
   public boolean shouldShutdown() {
-    LOG.debug("ContainerProcessManager state: Completed containers: {}, 
Configured containers: {}, Are there too many failed containers: {}, Is 
allocator thread alive: {}",
-      state.completedProcessors.get(), state.processorCount, 
jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
+    LOG.debug("ContainerProcessManager state: Completed containers: {}, 
Configured containers: {}, Are there too many failed containers: {}",
+      state.completedProcessors.get(), state.processorCount, 
jobFailureCriteriaMet ? "yes" : "no");
+
+    if (shouldStartAllocateThread()) {
+      LOG.debug("Is allocator thread alive: {}", allocatorThread.isAlive() ? 
"yes" : "no");
+    }
 
     if (exceptionOccurred != null) {
       LOG.error("Exception in container process manager", exceptionOccurred);
       throw new SamzaException(exceptionOccurred);
     }
-    return jobFailureCriteriaMet || state.completedProcessors.get() == 
state.processorCount.get() || !allocatorThread.isAlive();
+
+    boolean shouldShutdown = jobFailureCriteriaMet || 
state.completedProcessors.get() == state.processorCount.get();
+    return shouldStartAllocateThread() ? shouldShutdown || 
!allocatorThread.isAlive() : shouldShutdown;

Review comment:
       right. updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to