lakshmi-manasa-g commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r536411116
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -88,7 +88,7 @@
public ContainerManager(ContainerPlacementMetadataStore
containerPlacementMetadataStore,
SamzaApplicationState samzaApplicationState, ClusterResourceManager
clusterResourceManager,
- boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager
localityManager) {
+ FaultDomainManager faultDomainManager, boolean hostAffinityEnabled,
boolean standByEnabled, LocalityManager localityManager) {
Review comment:
can faultDomainManager be null?
if not, maybe a check for notNull?
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
Review comment:
using it for standby container is one of the uses -- i feel we should
not put this in the interface doc.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
private final Instant requestTimestamp;
public SamzaResourceRequest(int numCores, int memoryMB, String
preferredHost, String processorId) {
- this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+ this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);
Review comment:
emptyset instead of null?
however, what does empty set/null mean?
will the request ignore fault domain notions if its null but somehow break
the behavior if its an empty set?
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public class FaultDomain {
+
+ FaultDomainType type;
+ String id;
Review comment:
these seem to be final.. if they are not expected to change, maybe mark
them so
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -114,6 +114,11 @@
*/
private final ClusterResourceManager clusterResourceManager;
+ /**
+ * An interface to get information about nodes and the fault domains they
reside on.
Review comment:
minor: interface here is a bit confusing.. no strong objection if you
prefer to keep it
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
##########
@@ -87,6 +88,12 @@ public ClusterResourceManager(Callback callback) {
*/
public abstract void requestResources(SamzaResourceRequest resourceRequest);
+ /**
+ * Get the node to fault domain map from the cluster resource manager.
+ * @return A map of the nodes to the fault domain they reside in.
+ */
+ public abstract Map<String, String> getNodeToFaultDomainMap();
Review comment:
minor: prefer "host" to "node" as rest of samza (non-yarn specific) code
uses host
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
Review comment:
can you add a few lines about thread-safety, what an implementor of this
interface should guarantee/adhere to, what a user should expect and how to use?
looks like all supported functions are read-only kind.. as in the they are
all getter/check and no writes (setter) so maybe thread-safety might not be a
big concern. But still worth checking it and calling it out.
I am wondering, cluster is a dynamic system - hosts/domains might change and
then this manager's internal state is somehow updated by the implementor (?).
In such as case, what is the guarantee provided by this manager - will it
always show the state of the cluster right now or the last read from cluster
manager or maybe some other guarantees like if a host/domain is down in the
cluster then it will definitely not be part of the get-method-return-value.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
+ */
Review comment:
since its a new interface, should we mark it evolving?
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
+ */
+public interface FaultDomainManager {
+
+ /**
+ * This method returns all the fault domain values in a cluster for RUNNING
nodes.
Review comment:
1. how do we define RUNNING nodes? nodes where containers of this job
are running or Nodes that are healthy in the cluster or nodes with available
resources to run container..
2. does it get all domains of the cluster or only those that a job can
access?
3. nit: hosts instead of nodes.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
+ */
+public interface FaultDomainManager {
+
+ /**
+ * This method returns all the fault domain values in a cluster for RUNNING
nodes.
+ * @return a set of {@link FaultDomain}s
+ */
+ Set<FaultDomain> getAllFaultDomains();
+
+ /**
+ * This method returns the fault domain a particular node resides on.
+ * @param host the host
+ * @return the {@link FaultDomain}
+ */
+ FaultDomain getFaultDomainOfNode(String host);
+
+ /**
+ * This method checks if the two hostnames provided reside on the same fault
domain.
Review comment:
fault domain is a concept that maybe worth spending a couple of lines
over -- like a host can only belong to one domain, domain has >=1 hosts,
cluster has all its hosts in one domain or another
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
+ */
+public interface FaultDomainManager {
+
+ /**
+ * This method returns all the fault domain values in a cluster for RUNNING
nodes.
+ * @return a set of {@link FaultDomain}s
+ */
+ Set<FaultDomain> getAllFaultDomains();
+
+ /**
+ * This method returns the fault domain a particular node resides on.
+ * @param host the host
+ * @return the {@link FaultDomain}
+ */
+ FaultDomain getFaultDomainOfNode(String host);
Review comment:
would be nice to stick to one of the two - host or node - just to keep
it consistent :)
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
+ */
+public interface FaultDomainManager {
+
+ /**
+ * This method returns all the fault domain values in a cluster for RUNNING
nodes.
+ * @return a set of {@link FaultDomain}s
+ */
+ Set<FaultDomain> getAllFaultDomains();
+
+ /**
+ * This method returns the fault domain a particular node resides on.
+ * @param host the host
+ * @return the {@link FaultDomain}
+ */
+ FaultDomain getFaultDomainOfNode(String host);
+
+ /**
+ * This method checks if the two hostnames provided reside on the same fault
domain.
+ * @param host1 hostname
+ * @param host2 hostname
+ * @return true if the hosts exist on the same fault domain
+ */
+ boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+ /**
+ * This method gets the set of fault domains that the given active
container's corresponding standby can be placed on.
+ * @param host The hostname of the active container
+ * @return the set of fault domains on which this active container's standby
can be scheduled
Review comment:
1. hmm, i think this manager is designed specifically for standby
containers only. I feel this interface has potential beyond standbys and hence
we should strive to keep it generic - usable for scheduling actives also maybe?
2. what is the guarantee manager provides to the user of this method - like
for rack upgrades we would want it to be any domain not about to undergo
maintenance or currently under maintenance, there are others like this too
3. actually, adding to #2 above: maybe we should be able to define a "rule
set" the domain manager follows to fetch allowed domains. rules could include
stuff like not on a particular host/domain (then this method becomes
extensible beyond standby)...wdyt?
4. nit: if sticking to standby-only use case, indicate in the name as
`getAllowedFaultDomainsForSchedulingStandbyContainer`
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public class FaultDomain {
+
+ FaultDomainType type;
+ String id;
+
+ public FaultDomain(FaultDomainType type, String id) {
+ this.type = type;
+ this.id = id;
Review comment:
can they be null?
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
+ */
+public interface FaultDomainManager {
+
+ /**
+ * This method returns all the fault domain values in a cluster for RUNNING
nodes.
+ * @return a set of {@link FaultDomain}s
+ */
+ Set<FaultDomain> getAllFaultDomains();
+
+ /**
+ * This method returns the fault domain a particular node resides on.
+ * @param host the host
+ * @return the {@link FaultDomain}
+ */
+ FaultDomain getFaultDomainOfNode(String host);
+
+ /**
+ * This method checks if the two hostnames provided reside on the same fault
domain.
+ * @param host1 hostname
+ * @param host2 hostname
+ * @return true if the hosts exist on the same fault domain
+ */
+ boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+ /**
+ * This method gets the set of fault domains that the given active
container's corresponding standby can be placed on.
+ * @param host The hostname of the active container
+ * @return the set of fault domains on which this active container's standby
can be scheduled
+ */
+ Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host);
+
+ /**
+ * This method returns the cached map of nodes to fault domains.
+ * @return stored map of node to the fault domain it resides on
Review comment:
it may not be cached also right.. depending on the implementation.
this is where the guarantee of view of the cluster comes into play.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the
cluster manager (Yarn/Kubernetes/etc.).
+ * It also provides other functionality like exposing all the available fault
domains, checking if two hosts belong to
+ * the same fault domain, and getting the valid fault domains that a standby
container can be placed on.
+ */
+public interface FaultDomainManager {
+
+ /**
+ * This method returns all the fault domain values in a cluster for RUNNING
nodes.
+ * @return a set of {@link FaultDomain}s
+ */
+ Set<FaultDomain> getAllFaultDomains();
+
+ /**
+ * This method returns the fault domain a particular node resides on.
+ * @param host the host
+ * @return the {@link FaultDomain}
+ */
+ FaultDomain getFaultDomainOfNode(String host);
+
+ /**
+ * This method checks if the two hostnames provided reside on the same fault
domain.
+ * @param host1 hostname
+ * @param host2 hostname
+ * @return true if the hosts exist on the same fault domain
+ */
+ boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+ /**
+ * This method gets the set of fault domains that the given active
container's corresponding standby can be placed on.
+ * @param host The hostname of the active container
+ * @return the set of fault domains on which this active container's standby
can be scheduled
+ */
+ Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host);
+
+ /**
+ * This method returns the cached map of nodes to fault domains.
+ * @return stored map of node to the fault domain it resides on
+ */
+ Map<String, FaultDomain> getNodeToFaultDomainMap();
+
+ /**
+ * This method computes the node to fault domain map from the cluster
resource manager.
+ * @return map of node to the fault domain it resides on
Review comment:
ah, i realized now -- this is the is the method to update the manager's
fault domain map right.
so this is in some sense a setter? in which case we should think about
thread-safety between this setter and other getters.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -73,6 +83,18 @@ public SamzaResourceRequest(int numCores, int memoryMB,
String preferredHost, St
this.requestId = UUID.randomUUID().toString();
this.processorId = processorId;
this.requestTimestamp = requestTimestamp;
+ this.faultDomains = new HashSet<>();
+ log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at
time: {} with Request ID: {}", this.processorId, this.preferredHost,
this.requestTimestamp, this.requestId);
Review comment:
nit: adding fault domains to the log might be useful for debugging
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+/**
+ * A factory to build a {@link FaultDomainManager}.
+ */
Review comment:
same with stability of interface
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -125,7 +129,8 @@ public void handleContainerLaunchFail(String containerID,
String resourceID,
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
log.info("Handling launch fail for standby-container {}, requesting
resource on any host {}", containerID);
- containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
+ containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST,
+
faultDomainManager.getAllowedFaultDomainsForSchedulingContainer(getActiveContainerHost(containerID)));
Review comment:
hmm, wondering if we should place all these changes behind a config?
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -109,15 +135,24 @@ public String toString() {
", requestId='" + requestId + '\'' +
", processorId=" + processorId +
", requestTimestampMs=" + requestTimestamp +
+ ", faultDomains=" + convertFaultDomainSetToString() +
'}';
}
- /**
- * Requests are ordered by the processor type and the time at which they
were created.
- * Requests with timestamps in the future for retries take less precedence
than timestamps in the past or current.
- * Otherwise, active processors take precedence over standby processors,
regardless of timestamp.
- * @param o the other
- */
+ private String convertFaultDomainSetToString() {
+ StringBuilder faultDomainSb = new StringBuilder();
Review comment:
just curious, wont it work without this -esp since you already defined
the "toString" for FaultDomain?
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -181,13 +186,14 @@ private void handleStandbyContainerStop(String
standbyContainerID, String resour
// request standbycontainer's host for active-container
SamzaResourceRequest resourceRequestForActive =
- containerAllocator.getResourceRequestWithDelay(activeContainerID,
standbyContainerHostname, preferredHostRetryDelay);
+
containerAllocator.getResourceRequestWithDelay(activeContainerID,
standbyContainerHostname, preferredHostRetryDelay);
Review comment:
nit - some new extra spaces sneaked in :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]