This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2e9a1bdb1 [Improve][Zeta] Reduce the operation count of 
imap_running_job_metrics (#4861)
2e9a1bdb1 is described below

commit 2e9a1bdb12094edf3a39a960fb785138ac5b2f17
Author: ic4y <[email protected]>
AuthorDate: Mon Jun 5 16:51:57 2023 +0800

    [Improve][Zeta] Reduce the operation count of imap_running_job_metrics 
(#4861)
---
 .../apache/seatunnel/engine/common/Constant.java   |  2 +
 .../engine/server/CoordinatorService.java          |  8 +++
 .../engine/server/TaskExecutionService.java        | 77 ++++++++++++----------
 .../server/execution/TaskExecutionContext.java     | 10 ++-
 .../seatunnel/engine/server/master/JobMaster.java  | 37 +++++++----
 5 files changed, 86 insertions(+), 48 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 12e13bc19..8d52a444a 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -56,4 +56,6 @@ public class Constant {
     public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map";
 
     public static final String IMAP_RUNNING_JOB_METRICS = 
"engine_runningJobMetrics";
+
+    public static final Long IMAP_RUNNING_JOB_METRICS_KEY = 1L;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d38f2e028..bf913a424 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -37,9 +37,11 @@ import 
org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.master.JobHistoryService;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -53,6 +55,7 @@ import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import lombok.NonNull;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -126,6 +129,8 @@ public class CoordinatorService {
      */
     private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> 
ownedSlotProfilesIMap;
 
+    private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> 
metricsImap;
+
     /** If this node is a master node */
     private volatile boolean isActive = false;
 
@@ -191,6 +196,7 @@ public class CoordinatorService {
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
         ownedSlotProfilesIMap =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
+        metricsImap = 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
 
         jobHistoryService =
                 new JobHistoryService(
@@ -256,6 +262,7 @@ public class CoordinatorService {
                         runningJobStateTimestampsIMap,
                         ownedSlotProfilesIMap,
                         runningJobInfoIMap,
+                        metricsImap,
                         engineConfig);
 
         // If Job Status is CANCELLING , set needRestore to false
@@ -419,6 +426,7 @@ public class CoordinatorService {
                         runningJobStateTimestampsIMap,
                         ownedSlotProfilesIMap,
                         runningJobInfoIMap,
+                        metricsImap,
                         engineConfig);
         executorService.submit(
                 () -> {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 2fe8ee485..dd92e6104 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -134,8 +134,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
 
     private final ScheduledExecutorService scheduledExecutorService;
 
-    private CountDownLatch waitClusterStarted;
-
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties 
properties) {
         seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -493,7 +491,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                                             task.provideDynamicMetrics(copy3, 
context);
                                         });
                     });
-            updateMetricsContextInImap();
         } catch (Throwable t) {
             logger.warning("Dynamic metric collection failed", t);
             throw t;
@@ -501,43 +498,53 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     }
 
     private synchronized void updateMetricsContextInImap() {
+        if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+            logger.warning(
+                    String.format(
+                            "The Node is not ready yet, Node state %s,looking 
forward to the next "
+                                    + "scheduling",
+                            nodeEngine.getNode().getState()));
+            return;
+        }
+        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap 
=
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
         Map<TaskGroupLocation, TaskGroupContext> contextMap = new HashMap<>();
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
-        try {
-            if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+        HashMap<TaskLocation, SeaTunnelMetricsContext> localMap = new 
HashMap<>();
+        contextMap.forEach(
+                (taskGroupLocation, taskGroupContext) -> {
+                    taskGroupContext
+                            .getTaskGroup()
+                            .getTasks()
+                            .forEach(
+                                    task -> {
+                                        // MetricsContext only exists in 
SeaTunnelTask
+                                        if (task instanceof SeaTunnelTask) {
+                                            SeaTunnelTask seaTunnelTask = 
(SeaTunnelTask) task;
+                                            if (null != 
seaTunnelTask.getMetricsContext()) {
+                                                localMap.put(
+                                                        
seaTunnelTask.getTaskLocation(),
+                                                        
seaTunnelTask.getMetricsContext());
+                                            }
+                                        }
+                                    });
+                });
+        if (localMap.size() > 0) {
+            try {
+                metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+                HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+                        metricsImap.computeIfAbsent(
+                                Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> 
new HashMap<>());
+                centralMap.putAll(localMap);
+                metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);
+            } catch (Exception e) {
                 logger.warning(
-                        String.format(
-                                "The Node is not ready yet, Node state 
%s,looking forward to the next "
-                                        + "scheduling",
-                                nodeEngine.getNode().getState()));
-                return;
+                        "The Imap acquisition failed due to the hazelcast node 
being offline or restarted, and will be retried next time",
+                        e);
+            } finally {
+                metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
             }
-
-            IMap<TaskLocation, SeaTunnelMetricsContext> map =
-                    
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
-            contextMap.forEach(
-                    (taskGroupLocation, taskGroupContext) -> {
-                        taskGroupContext
-                                .getTaskGroup()
-                                .getTasks()
-                                .forEach(
-                                        task -> {
-                                            // MetricsContext only exists in 
SeaTunnelTask
-                                            if (task instanceof SeaTunnelTask) 
{
-                                                SeaTunnelTask seaTunnelTask = 
(SeaTunnelTask) task;
-                                                if (null != 
seaTunnelTask.getMetricsContext()) {
-                                                    map.put(
-                                                            
seaTunnelTask.getTaskLocation(),
-                                                            
seaTunnelTask.getMetricsContext());
-                                                }
-                                            }
-                                        });
-                    });
-        } catch (Exception e) {
-            logger.warning(
-                    "The Imap acquisition failed due to the hazelcast node 
being offline or restarted, and will be retried next time",
-                    e);
         }
         this.printTaskExecutionRuntimeInfo();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 55249babc..4c9a48a71 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -30,6 +30,8 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
+import java.util.HashMap;
+
 public class TaskExecutionContext {
 
     private final Task task;
@@ -56,9 +58,13 @@ public class TaskExecutionContext {
     }
 
     public SeaTunnelMetricsContext getOrCreateMetricsContext(TaskLocation 
taskLocation) {
-        IMap<TaskLocation, SeaTunnelMetricsContext> map =
+        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> map =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
-        return map.computeIfAbsent(taskLocation, k -> new 
SeaTunnelMetricsContext());
+        HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+                map.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+        return centralMap == null || centralMap.get(taskLocation) == null
+                ? new SeaTunnelMetricsContext()
+                : centralMap.get(taskLocation);
     }
 
     public <T> T getTask() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 876ac80cc..13b89a69d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -81,6 +81,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
 
@@ -139,6 +140,8 @@ public class JobMaster {
 
     private final IMap<Long, JobInfo> runningJobInfoIMap;
 
+    private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> 
metricsImap;
+
     /** If the job or pipeline cancel by user, needRestore will be false */
     @Getter private volatile boolean needRestore = true;
 
@@ -154,6 +157,7 @@ public class JobMaster {
             @NonNull IMap runningJobStateTimestampsIMap,
             @NonNull IMap ownedSlotProfilesIMap,
             @NonNull IMap<Long, JobInfo> runningJobInfoIMap,
+            @NonNull IMap<Long, HashMap<TaskLocation, 
SeaTunnelMetricsContext>> metricsImap,
             EngineConfig engineConfig) {
         this.jobImmutableInformationData = jobImmutableInformationData;
         this.nodeEngine = nodeEngine;
@@ -169,6 +173,7 @@ public class JobMaster {
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
         this.runningJobInfoIMap = runningJobInfoIMap;
         this.engineConfig = engineConfig;
+        this.metricsImap = metricsImap;
     }
 
     public void init(long initializationTimestamp, boolean restart, boolean 
canRestoreAgain)
@@ -546,17 +551,27 @@ public class JobMaster {
             PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
         if (pipelineStatus.equals(PipelineStatus.FINISHED) && 
!checkpointManager.isSavePointEnd()
                 || pipelineStatus.equals(PipelineStatus.CANCELED)) {
-            IMap<TaskLocation, SeaTunnelMetricsContext> map =
-                    
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
-            map.keySet().stream()
-                    .filter(
-                            taskLocation -> {
-                                return taskLocation
-                                        .getTaskGroupLocation()
-                                        .getPipelineLocation()
-                                        .equals(pipelineLocation);
-                            })
-                    .forEach(map::remove);
+            try {
+                metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+                HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+                        metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+                if (centralMap != null) {
+                    List<TaskLocation> collect =
+                            centralMap.keySet().stream()
+                                    .filter(
+                                            taskLocation -> {
+                                                return taskLocation
+                                                        .getTaskGroupLocation()
+                                                        .getPipelineLocation()
+                                                        
.equals(pipelineLocation);
+                                            })
+                                    .collect(Collectors.toList());
+                    collect.forEach(centralMap::remove);
+                    metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);
+                }
+            } finally {
+                metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+            }
         }
     }
 

Reply via email to