mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547411491
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -512,7 +529,13 @@ public void
checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest re
"Running standby container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource, and making a new
ANY_HOST request",
containerID, samzaResource.getHost());
releaseUnstartableContainer(request, samzaResource, preferredHost,
resourceRequestState);
- checkFaultDomainAwarenessEnabledAndRequestResource(containerAllocator,
containerID, ResourceRequestState.ANY_HOST);
+ Optional<String> activeContainerHostOpt =
getActiveContainerHost(containerID);
+ String activeContainerHost = null;
+ if (activeContainerHostOpt.isPresent()) {
+ activeContainerHost = activeContainerHostOpt.get();
+ }
Review comment:
can be simplified to
```
activeContainerHost = getActiveContainerHost(containerID)
.orElse(null);
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -56,8 +58,15 @@
// Resource-manager, used to stop containers
private ClusterResourceManager clusterResourceManager;
- public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
- ClusterResourceManager clusterResourceManager, LocalityManager
localityManager) {
+ // FaultDomainManager, used to get fault domain information of different
hosts from the cluster manager.
+ private final FaultDomainManager faultDomainManager;
+
+ private final Config config;
Review comment:
remove `config` since you have already persisted the switch in the
boolean.
##########
File path:
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -241,6 +241,11 @@ public void requestResources(SamzaResourceRequest
resourceRequest) {
String processorId = resourceRequest.getProcessorId();
String requestId = resourceRequest.getRequestId();
String preferredHost = resourceRequest.getPreferredHost();
+ String[] racks = null;
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled()) {
+ racks =
resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
+ }
Review comment:
What do you think about not having control flow here and pass empty
array instead of null if it doesn't change things semantically in YARN?
By doing so, you will just eliminate unnecessary control flow and given
faultDomains is guaranteed to be empty or present, null handling is not
necessary as well.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -436,7 +448,11 @@ void
checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator conta
if (resource == null) {
resource =
samzaApplicationState.runningProcessors.get(activeContainerId);
}
- return Optional.ofNullable(resource.getHost());
+ if (resource != null) {
+ return Optional.ofNullable(resource.getHost());
+ } else {
+ return Optional.empty();
+ }
Review comment:
can be simplified to
```
return Optional.of(resource)
.map(SamzaResourceRequest::getHost)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]