Hisoka-X commented on code in PR #9833:
URL: https://github.com/apache/seatunnel/pull/9833#discussion_r2333870588


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -343,49 +343,79 @@ private void printExecutionInfo() {
         }
     }
 
-    public synchronized void updateMetrics(Map<TaskLocation, 
SeaTunnelMetricsContext> localMap) {
+    public void updateMetrics(Map<TaskLocation, SeaTunnelMetricsContext> 
localMap) {
         if (localMap == null || localMap.isEmpty()) {
             return;
         }
-        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap 
=
-                
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+        int partitionCount = 
seaTunnelConfig.getEngineConfig().getJobMetricsPartitionCount();
 
-        HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
-                metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+        IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
+                
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
 
-        if (centralMap == null) {
-            centralMap = new HashMap<>();
-        }
-        centralMap.putAll(localMap);
-        metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
+        Map<Long, Map<TaskLocation, SeaTunnelMetricsContext>> partitioned = 
new HashMap<>();
+        localMap.forEach(
+                (key, value) -> {
+                    long partition = getMetricsImapPartition(key, 
partitionCount);
+                    partitioned.computeIfAbsent(partition, k -> new 
HashMap<>()).put(key, value);
+                });
+
+        partitioned
+                .entrySet()
+                .parallelStream()
+                .forEach(
+                        entry -> {
+                            metricsImap.compute(
+                                    entry.getKey(),
+                                    (k, oldVal) -> {
+                                        if (oldVal == null) oldVal = new 
HashMap<>();
+                                        oldVal.putAll(entry.getValue());
+                                        return oldVal;
+                                    });
+                        });
     }
 
-    public synchronized void removeMetrics(PipelineLocation pipelineLocation) {
-        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap 
=
+    public void removeMetrics(PipelineLocation pipelineLocation) {
+        IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
                 
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
 
-        HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
-                metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
-        if (centralMap == null) {
-            return;
+        Map<Long, List<TaskLocation>> partitionedTasks = new HashMap<>();
+        for (Map.Entry<Long, Map<TaskLocation, SeaTunnelMetricsContext>> entry 
:
+                metricsImap.entrySet()) {
+            long partition = entry.getKey();
+            List<TaskLocation> tasksToRemove =
+                    entry.getValue().keySet().stream()
+                            .filter(
+                                    t ->
+                                            t.getTaskGroupLocation()
+                                                    .getPipelineLocation()
+                                                    .equals(pipelineLocation))
+                            .collect(Collectors.toList());
+            if (!tasksToRemove.isEmpty()) {
+                partitionedTasks.put(partition, tasksToRemove);
+            }
         }
 
-        List<TaskLocation> taskLocations = getTaskLocations(pipelineLocation, 
centralMap);
-        taskLocations.forEach(centralMap::remove);
-        metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
+        partitionedTasks
+                .entrySet()
+                .parallelStream()
+                .forEach(

Review Comment:
   Why foreach metricsImap twice? Can we use only once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to