This is an automated email from the ASF dual-hosted git repository.
govind pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 5daf0cc STORM-3259: Fixes reading of NUMA supervisor assignments
new a2905c8 Merge pull request #3257 from
govind-menon/STORM-3259-Containers
5daf0cc is described below
commit 5daf0cce0ddbbb7c6ceee38c5cfec9ba3a858517
Author: Govind Menon <[email protected]>
AuthorDate: Thu Apr 23 13:07:20 2020 -0500
STORM-3259: Fixes reading of NUMA supervisor assignments
---
.../storm/daemon/supervisor/ReadClusterState.java | 4 +-
.../supervisor/timer/SynchronizeAssignments.java | 57 +++++++++++++++++-----
2 files changed, 48 insertions(+), 13 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 7337927..503b44a 100644
---
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -247,7 +247,7 @@ public class ReadClusterState implements Runnable,
AutoCloseable {
Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap =
assignment.get_worker_resources();
if (nodeInfoWorkerResourcesMap != null) {
for (Map.Entry<NodeInfo, WorkerResources> entry :
nodeInfoWorkerResourcesMap.entrySet()) {
- if (entry.getKey().get_node().equals(assignmentId)) {
+ if (entry.getKey().get_node().startsWith(assignmentId)) {
Set<Long> ports = entry.getKey().get_port();
for (Long port : ports) {
slotsResources.put(port, entry.getValue());
@@ -267,7 +267,7 @@ public class ReadClusterState implements Runnable,
AutoCloseable {
Map<List<Long>, NodeInfo> executorNodePort =
assignment.get_executor_node_port();
if (executorNodePort != null) {
for (Map.Entry<List<Long>, NodeInfo> entry :
executorNodePort.entrySet()) {
- if (entry.getValue().get_node().equals(assignmentId)) {
+ if (entry.getValue().get_node().startsWith(assignmentId)) {
for (Long port : entry.getValue().get_port()) {
LocalAssignment localAssignment =
portTasks.get(port.intValue());
if (localAssignment == null) {
diff --git
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
index 3960ce9..62f73f2 100644
---
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
+++
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
@@ -12,12 +12,19 @@
package org.apache.storm.daemon.supervisor.timer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
+import org.apache.storm.ServerConstants;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.ReadClusterState;
import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SupervisorAssignments;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.ConfigUtils;
@@ -49,14 +56,21 @@ public class SynchronizeAssignments implements Runnable {
this.readClusterState = readClusterState;
}
- private static void assignedAssignmentsToLocal(IStormClusterState
clusterState, SupervisorAssignments assignments) {
- if (null == assignments) {
+ private static void assignedAssignmentsToLocal(IStormClusterState
clusterState,
+ List<SupervisorAssignments>
supervisorAssignments) {
+ if (null == supervisorAssignments || supervisorAssignments.isEmpty()) {
//unknown error, just skip
return;
}
Map<String, byte[]> serAssignments = new HashMap<>();
- for (Map.Entry<String, Assignment> entry :
assignments.get_storm_assignment().entrySet()) {
- serAssignments.put(entry.getKey(),
Utils.serialize(entry.getValue()));
+ for (SupervisorAssignments supervisorAssignment :
supervisorAssignments) {
+ if (supervisorAssignment == null) {
+ //unknown error, just skip
+ continue;
+ }
+ for (Map.Entry<String, Assignment> entry :
supervisorAssignment.get_storm_assignment().entrySet()) {
+ serAssignments.put(entry.getKey(),
Utils.serialize(entry.getValue()));
+ }
}
clusterState.syncRemoteAssignments(serAssignments);
}
@@ -67,7 +81,7 @@ public class SynchronizeAssignments implements Runnable {
if (null == assignments) {
getAssignmentsFromMaster(this.supervisor.getConf(),
this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
} else {
- assignedAssignmentsToLocal(this.supervisor.getStormClusterState(),
assignments);
+ assignedAssignmentsToLocal(this.supervisor.getStormClusterState(),
Collections.singletonList(assignments));
}
this.readClusterState.run();
}
@@ -81,7 +95,7 @@ public class SynchronizeAssignments implements Runnable {
while (!success) {
try (NimbusClient master =
NimbusClient.getConfiguredClient(supervisor.getConf())) {
SupervisorAssignments assignments =
master.getClient().getSupervisorAssignments(supervisor.getAssignmentId());
- assignedAssignmentsToLocal(supervisor.getStormClusterState(),
assignments);
+ assignedAssignmentsToLocal(supervisor.getStormClusterState(),
Collections.singletonList(assignments));
success = true;
} catch (Exception t) {
// just ignore the exception
@@ -99,6 +113,24 @@ public class SynchronizeAssignments implements Runnable {
}
+ public List<SupervisorAssignments> getAllAssignmentsFromNumaSupervisors(
+ Nimbus.Iface nimbus, String node
+ ) throws TException {
+ List<SupervisorAssignments> supervisorAssignmentsList = new
ArrayList();
+ Map<String, Object> validatedNumaMap =
SupervisorUtils.getNumaMap(supervisor.getConf());
+ for (Map.Entry<String, Object> numaEntry :
validatedNumaMap.entrySet()) {
+ String numaId = numaEntry.getKey();
+ SupervisorAssignments assignments =
nimbus.getSupervisorAssignments(
+ node + ServerConstants.NUMA_ID_SEPARATOR + numaId
+ );
+ supervisorAssignmentsList.add(assignments);
+ }
+ SupervisorAssignments assignments =
nimbus.getSupervisorAssignments(node);
+ supervisorAssignmentsList.add(assignments);
+
+ return supervisorAssignmentsList;
+ }
+
/**
* Used by {@link Supervisor} to fetch assignments when start up.
* @param conf config
@@ -108,16 +140,19 @@ public class SynchronizeAssignments implements Runnable {
public void getAssignmentsFromMaster(Map conf, IStormClusterState
clusterState, String node) {
if (ConfigUtils.isLocalMode(conf)) {
try {
- SupervisorAssignments assignments =
this.supervisor.getLocalNimbus().getSupervisorAssignments(node);
- assignedAssignmentsToLocal(clusterState, assignments);
+ List<SupervisorAssignments> supervisorAssignmentsList =
+ getAllAssignmentsFromNumaSupervisors(
+ this.supervisor.getLocalNimbus(), node
+ );
+ assignedAssignmentsToLocal(clusterState,
supervisorAssignmentsList);
} catch (TException e) {
LOG.error("Get assignments from local master exception", e);
}
} else {
try (NimbusClient master = NimbusClient.getConfiguredClient(conf))
{
- SupervisorAssignments assignments =
master.getClient().getSupervisorAssignments(node);
- LOG.debug("Sync an assignments from master, will start to sync
with assignments: {}", assignments);
- assignedAssignmentsToLocal(clusterState, assignments);
+ List<SupervisorAssignments> supervisorAssignmentsList =
getAllAssignmentsFromNumaSupervisors(master.getClient(), node);
+ LOG.debug("Sync an assignments from master, will start to sync
with assignments: {}", supervisorAssignmentsList);
+ assignedAssignmentsToLocal(clusterState,
supervisorAssignmentsList);
} catch (Exception t) {
LOG.error("Get assignments from master exception", t);
}