Hisoka-X commented on code in PR #9833:
URL: https://github.com/apache/seatunnel/pull/9833#discussion_r2341176270
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -343,49 +343,79 @@ private void printExecutionInfo() {
}
}
- public synchronized void updateMetrics(Map<TaskLocation,
SeaTunnelMetricsContext> localMap) {
+ public void updateMetrics(Map<TaskLocation, SeaTunnelMetricsContext>
localMap) {
if (localMap == null || localMap.isEmpty()) {
return;
}
- IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
-
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ int partitionCount =
seaTunnelConfig.getEngineConfig().getJobMetricsPartitionCount();
- HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
- metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
+
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- if (centralMap == null) {
- centralMap = new HashMap<>();
- }
- centralMap.putAll(localMap);
- metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
+ Map<Long, Map<TaskLocation, SeaTunnelMetricsContext>> partitioned =
new HashMap<>();
+ localMap.forEach(
+ (key, value) -> {
+ long partition = getMetricsImapPartition(key,
partitionCount);
+ partitioned.computeIfAbsent(partition, k -> new
HashMap<>()).put(key, value);
+ });
+
+ partitioned
+ .entrySet()
+ .parallelStream()
+ .forEach(
+ entry -> {
+ metricsImap.compute(
+ entry.getKey(),
+ (k, oldVal) -> {
+ if (oldVal == null) oldVal = new
HashMap<>();
+ oldVal.putAll(entry.getValue());
+ return oldVal;
+ });
+ });
}
- public synchronized void removeMetrics(PipelineLocation pipelineLocation) {
- IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
+ public void removeMetrics(PipelineLocation pipelineLocation) {
+ IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
- metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
- if (centralMap == null) {
- return;
+ Map<Long, List<TaskLocation>> partitionedTasks = new HashMap<>();
+ for (Map.Entry<Long, Map<TaskLocation, SeaTunnelMetricsContext>> entry
:
+ metricsImap.entrySet()) {
+ long partition = entry.getKey();
+ List<TaskLocation> tasksToRemove =
+ entry.getValue().keySet().stream()
+ .filter(
+ t ->
+ t.getTaskGroupLocation()
+ .getPipelineLocation()
+ .equals(pipelineLocation))
+ .collect(Collectors.toList());
+ if (!tasksToRemove.isEmpty()) {
+ partitionedTasks.put(partition, tasksToRemove);
+ }
}
- List<TaskLocation> taskLocations = getTaskLocations(pipelineLocation,
centralMap);
- taskLocations.forEach(centralMap::remove);
- metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
+ partitionedTasks
+ .entrySet()
+ .parallelStream()
+ .forEach(
Review Comment:
We are a bit limited by imap, maybe we should switch to rocksdb in the
future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]