mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547411491



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -512,7 +529,13 @@ public void 
checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest re
           "Running standby container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
           containerID, samzaResource.getHost());
       releaseUnstartableContainer(request, samzaResource, preferredHost, 
resourceRequestState);
-      checkFaultDomainAwarenessEnabledAndRequestResource(containerAllocator, 
containerID, ResourceRequestState.ANY_HOST);
+      Optional<String> activeContainerHostOpt = 
getActiveContainerHost(containerID);
+      String activeContainerHost = null;
+      if (activeContainerHostOpt.isPresent()) {
+        activeContainerHost = activeContainerHostOpt.get();
+      }

Review comment:
       can be simplified to 
   ```
   activeContainerHost = getActiveContainerHost(containerID)
      .orElse(null);

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -56,8 +58,15 @@
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager 
localityManager) {
+  // FaultDomainManager, used to get fault domain information of different 
hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final Config config;

Review comment:
       remove `config` since you have already persisted the switch in the 
boolean.

##########
File path: 
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -241,6 +241,11 @@ public void requestResources(SamzaResourceRequest 
resourceRequest) {
     String processorId = resourceRequest.getProcessorId();
     String requestId = resourceRequest.getRequestId();
     String preferredHost = resourceRequest.getPreferredHost();
+    String[] racks = null;
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+    if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled()) {
+      racks = 
resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
+    }

Review comment:
       What do you think about not having control flow here and pass empty 
array instead of null if it doesn't change things semantically in YARN?
   
   By doing so, you will just eliminate unnecessary control flow and given 
faultDomains is guaranteed to be empty or present, null handling is not 
necessary as well.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -436,7 +448,11 @@ void 
checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator conta
     if (resource == null) {
       resource = 
samzaApplicationState.runningProcessors.get(activeContainerId);
     }
-    return Optional.ofNullable(resource.getHost());
+    if (resource != null) {
+      return Optional.ofNullable(resource.getHost());
+    } else {
+      return Optional.empty();
+    }

Review comment:
       can be simplified to 
   ```
   return Optional.of(resource)
       .map(SamzaResourceRequest::getHost)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to