mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545285119
##########
File path:
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -43,20 +42,24 @@
public class YarnFaultDomainManager implements FaultDomainManager {
private Multimap<String, FaultDomain> hostToRackMap;
- private final SamzaApplicationState state;
private final YarnClientImpl yarnClient;
+ private final MetricsRegistry metricsRegistry;
+ private final String groupName = "yarn-fault-domain-manager";
Review comment:
nit: make it static
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -168,11 +173,13 @@ public void handleContainerStopFail(String containerID,
String resourceID,
* @param host The hostname of the active container
* @return the set of racks on which this active container's standby can be
scheduled
*/
- public Set<FaultDomain>
getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
- Set<FaultDomain> activeContainerRack =
faultDomainManager.getFaultDomainOfHost(host);
- Set<FaultDomain> standbyRacks = faultDomainManager.getAllFaultDomains();
- standbyRacks.removeAll(activeContainerRack);
- return standbyRacks;
+ public Set<FaultDomain>
getAllowedFaultDomainsForSchedulingStandbyContainer(Optional<String> host) {
+ Set<FaultDomain> standbyFaultDomain =
faultDomainManager.getAllFaultDomains();
+ if (host.isPresent()) {
+ Set<FaultDomain> activeContainerFaultDomain =
faultDomainManager.getFaultDomainsForHost(host.get());
+ standbyFaultDomain.removeAll(activeContainerFaultDomain);
+ }
+ return standbyFaultDomain;
Review comment:
Sounds like this method is overloaded and has implicit contracts.
e.g., when the incoming host is not present it returns all the fault domains
of the cluster which isn't clear and should be the case.
Either we should rename the method and call out the implication of having an
empty input parameter or just simplify the contract of the method and not
overload it.
I'd prefer latter since
1. Getting the allowed fault domain vs resorting to something may differ at
the caller's end.
2. Getting all the fault domains seems straight forward call to
`faultDomainManager`
3. Additionally, optionals are suitable for returns but taking optional
parameter is highly discouraged where possible. Ideally, you want callers to
determine on how they want to handle downstream call in the absence of input
data and not the downstream method themself.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String
containerIdToStart, String host) {
SamzaResource resource =
samzaApplicationState.pendingProcessors.get(containerID);
// return false if a conflicting container is pending for launch on the
host
- if (resource != null && resource.getHost().equals(host)) {
- log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
- containerIdToStart, host, containerID);
+ if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource,
containerID)) {
return false;
}
// return false if a conflicting container is running on the host
resource = samzaApplicationState.runningProcessors.get(containerID);
- if (resource != null && resource.getHost().equals(host)) {
- log.info("Container {} cannot be started on host {} because container
{} is already running on this host",
- containerIdToStart, host, containerID);
+ if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource,
containerID)) {
return false;
}
}
return true;
}
+ boolean checkStandbyConstraintsHelper(String containerIdToStart, String
hostToStartContainerOn, SamzaResource existingResource, String
existingContainerID) {
+ if (existingResource != null) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() &&
faultDomainManager.hasSameFaultDomains(hostToStartContainerOn,
existingResource.getHost())) {
+ log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this rack",
+ containerIdToStart, hostToStartContainerOn,
existingContainerID);
+ if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+ }
+ return false;
+ } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+ log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
+ containerIdToStart, hostToStartContainerOn,
existingContainerID);
Review comment:
seems like the logging is now impacted and different from the previous
flow in the absence of rack aware standby.
i.e. this method gets invoked with `pendingResource` and `runningResource`
and seems both will end up printing as `scheduled on this host`
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String
containerIdToStart, String host) {
SamzaResource resource =
samzaApplicationState.pendingProcessors.get(containerID);
// return false if a conflicting container is pending for launch on the
host
- if (resource != null && resource.getHost().equals(host)) {
- log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
- containerIdToStart, host, containerID);
+ if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource,
containerID)) {
return false;
}
// return false if a conflicting container is running on the host
resource = samzaApplicationState.runningProcessors.get(containerID);
- if (resource != null && resource.getHost().equals(host)) {
- log.info("Container {} cannot be started on host {} because container
{} is already running on this host",
- containerIdToStart, host, containerID);
+ if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource,
containerID)) {
return false;
}
}
return true;
}
+ boolean checkStandbyConstraintsHelper(String containerIdToStart, String
hostToStartContainerOn, SamzaResource existingResource, String
existingContainerID) {
+ if (existingResource != null) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() &&
faultDomainManager.hasSameFaultDomains(hostToStartContainerOn,
existingResource.getHost())) {
+ log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this rack",
+ containerIdToStart, hostToStartContainerOn,
existingContainerID);
+ if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+ }
+ return false;
+ } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+ log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
+ containerIdToStart, hostToStartContainerOn,
existingContainerID);
Review comment:
can we not inline this within `checkStandbyConstraints`? doesn't seem
much to be extracted plus you don't need to recreate ClusterManagerConfig for
every invocation and plus addresses the problem I raised above about logging
being different.
##########
File path:
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -117,4 +120,8 @@ public boolean hasSameFaultDomains(String host1, String
host2) {
}
return hostToRackMap;
}
+
+ private void initMetrics() {
+ hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(groupName,
"host-to-fault-domain-cache-updates");
+ }
Review comment:
why not inline? what are we getting by extracting it to a method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]