This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3017ea540 [bugfix][zeta] fix metrics lose on pipeline restart (#3977)
3017ea540 is described below

commit 3017ea54011b5f5201bb9392e4f965483ccb537c
Author: ic4y <[email protected]>
AuthorDate: Thu Feb 2 18:11:56 2023 +0800

    [bugfix][zeta] fix metrics lose on pipeline restart (#3977)
    
    * [bugfix][zeta] fix metrics lose on pipeline restart
---
 release-note.md                                    |  2 +-
 .../seatunnel/api/common/metrics/JobMetrics.java   |  9 ++-
 .../seatunnel/api/common/metrics/Measurement.java  |  4 ++
 .../src/test/resources/junit-platform.properties   | 19 ++++++
 .../apache/seatunnel/engine/common/Constant.java   |  2 +
 .../engine/common/config/EngineConfig.java         |  9 ++-
 .../config/YamlSeaTunnelDomConfigProcessor.java    |  3 +
 .../common/config/server/ServerConfigOptions.java  |  2 +
 .../engine/server/CoordinatorService.java          |  3 +-
 .../engine/server/TaskExecutionService.java        | 41 +++++++++++++
 .../server/checkpoint/CheckpointManager.java       |  2 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  5 +-
 .../server/execution/TaskExecutionContext.java     |  9 +++
 .../seatunnel/engine/server/master/JobMaster.java  | 13 ++++
 .../engine/server/metrics/MetricsContext.java      |  5 +-
 .../engine/server/task/SeaTunnelTask.java          |  2 +-
 .../engine/server/master/JobMetricsTest.java       | 51 +++++++++++++---
 .../src/test/resources/stream_fake_to_console.conf | 71 ++++++++++++++++++++++
 18 files changed, 234 insertions(+), 18 deletions(-)

diff --git a/release-note.md b/release-note.md
index 9d4f6a64c..f72c8ee69 100644
--- a/release-note.md
+++ b/release-note.md
@@ -53,7 +53,7 @@
 - [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808
 - [Checkpoint] Add savepoint and restore with savepoint #3930
 - [Core]Fix Local Mode can't deserialize split (#3817)
-- [Metrics] Fix Metrics will lose when Job be canceled. #3797
+- [Metrics] Fix Metrics will lose when Job be canceled or restart. #3797 #3977
 
 ### Documents
 - [Doc] seatunnel run with flink operator error #3998
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
index a0f4f7a82..78e7d2d55 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -78,8 +78,13 @@ public final class JobMetrics implements Serializable {
         Map<String, List<Measurement>> metricsMap = new HashMap<>();
         metrics.forEach((key, value) -> metricsMap.put(key, new 
ArrayList<>(value)));
         jobMetrics.metrics.forEach((key, value) -> metricsMap.merge(key, 
value, (v1, v2) -> {
-            v1.addAll(v2);
-            return v1;
+            List<Measurement> ms = new ArrayList<>(v2);
+            for (Measurement m1 : v1) {
+                if (v2.stream().noneMatch(m2 -> 
m2.getTags().equals(m1.getTags()))) {
+                    ms.add(m1);
+                }
+            }
+            return ms;
         }));
         return new JobMetrics(metricsMap);
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
index 6ea645955..58d521652 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
@@ -109,6 +109,10 @@ public final class Measurement implements Serializable {
         return tags.get(name);
     }
 
+    public Map<String, String> getTags() {
+        return tags;
+    }
+
     @Override
     public int hashCode() {
         return 31 * (int) (timestamp * 31 + value.hashCode()) + 
Objects.hashCode(tags);
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/junit-platform.properties
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/junit-platform.properties
new file mode 100644
index 000000000..1b9e4750c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/junit-platform.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+junit.jupiter.execution.parallel.mode.default = same_thread
+junit.jupiter.execution.parallel.mode.classes.default = same_thread
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 50e70f426..0fb820cba 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -57,4 +57,6 @@ public class Constant {
     public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d";
 
     public static final String IMAP_RESOURCE_MANAGER_REGISTER_WORKER = 
"ResourceManager_RegisterWorker";
+
+    public static final String IMAP_RUNNING_JOB_METRICS = 
"engine_runningJobMetrics";
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index f4d1d066c..a51b8a128 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -36,6 +36,8 @@ public class EngineConfig {
 
     private int printJobMetricsInfoInterval = 
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue();
 
+    private int jobMetricsBackupInterval = 
ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.defaultValue();
+
     private SlotServiceConfig slotServiceConfig = 
ServerConfigOptions.SLOT_SERVICE.defaultValue();
 
     private CheckpointConfig checkpointConfig = 
ServerConfigOptions.CHECKPOINT.defaultValue();
@@ -53,10 +55,15 @@ public class EngineConfig {
     }
 
     public void setPrintJobMetricsInfoInterval(int 
printJobMetricsInfoInterval) {
-        checkPositive(printExecutionInfoInterval, 
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
+        checkPositive(printJobMetricsInfoInterval, 
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
         this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
     }
 
+    public void setJobMetricsBackupInterval(int jobMetricsBackupInterval) {
+        checkPositive(jobMetricsBackupInterval, 
ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL + " must be > 0");
+        this.jobMetricsBackupInterval = jobMetricsBackupInterval;
+    }
+
     public void setQueueType(QueueType queueType) {
         checkNotNull(queueType);
         this.queueType = queueType;
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 346b377e7..e7b4fb907 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -105,6 +105,9 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
             } else if 
(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) {
                 
engineConfig.setPrintJobMetricsInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(),
                     getTextContent(node)));
+            } else if 
(ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key().equals(name)) {
+                
engineConfig.setJobMetricsBackupInterval(getIntegerValue(ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key(),
+                    getTextContent(node)));
             } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
                 
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
             } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 61c1e5073..7f0d30d17 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -33,6 +33,8 @@ public class ServerConfigOptions {
 
     public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL = 
Options.key("print-job-metrics-info-interval").intType().defaultValue(60).withDescription("The
 interval (in seconds) of job print metrics info");
 
+    public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL = 
Options.key("job-metrics-backup-interval").intType().defaultValue(3).withDescription("The
 interval (in seconds) of job metrics backups");
+
     public static final Option<Boolean> DYNAMIC_SLOT = 
Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether
 to use dynamic slot.");
 
     public static final Option<Integer> SLOT_NUM = 
Options.key("slot-num").intType().defaultValue(2).withDescription("The number 
of slots. Only valid when dynamic slot is disabled.");
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index dd053d2b9..f5e502201 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -430,7 +430,8 @@ public class CoordinatorService {
     public JobStatus getJobStatus(long jobId) {
         JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
         if (runningJobMaster == null) {
-            return jobHistoryService.getJobDetailState(jobId).getJobStatus();
+            JobHistoryService.JobStateData jobDetailState = 
jobHistoryService.getJobDetailState(jobId);
+            return null == jobDetailState ? null : 
jobDetailState.getJobStatus();
         }
         return runningJobMaster.getJobStatus();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 0f65aea28..688a38c69 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -32,6 +32,9 @@ import static java.util.stream.Collectors.toList;
 
 import org.apache.seatunnel.api.common.metrics.MetricTags;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -45,6 +48,8 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
 
@@ -56,6 +61,8 @@ import com.hazelcast.internal.metrics.MetricsRegistry;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
 import com.hazelcast.logging.ILogger;
+import com.hazelcast.map.IMap;
+import com.hazelcast.spi.exception.WrongTargetException;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import com.hazelcast.spi.properties.HazelcastProperties;
@@ -76,9 +83,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -101,8 +111,12 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> 
finishedExecutionContexts = new ConcurrentHashMap<>();
     private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> 
cancellationFutures =
         new ConcurrentHashMap<>();
+    private final SeaTunnelConfig seaTunnelConfig;
+
+    private final ScheduledExecutorService scheduledExecutorService;
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties 
properties) {
+        seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
         this.nodeEngine = nodeEngine;
         this.logger = 
nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
@@ -111,6 +125,8 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         MetricDescriptor descriptor = registry.newMetricDescriptor()
             .withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
         registry.registerStaticMetrics(descriptor, this);
+        scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+        
scheduledExecutorService.scheduleAtFixedRate(this::updateMetricsContextInImap, 
0, seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(), 
TimeUnit.SECONDS);
     }
 
     public void start() {
@@ -120,6 +136,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     public void shutdown() {
         isRunning = false;
         executorService.shutdownNow();
+        scheduledExecutorService.shutdown();
     }
 
     public TaskGroupContext getExecutionContext(TaskGroupLocation 
taskGroupLocation) {
@@ -328,12 +345,36 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                     task.provideDynamicMetrics(copy3, context);
                 });
             });
+            updateMetricsContextInImap();
         } catch (Throwable t) {
             logger.warning("Dynamic metric collection failed", t);
             throw t;
         }
     }
 
+    private synchronized void updateMetricsContextInImap() {
+        Map<TaskGroupLocation, TaskGroupContext> contextMap = new HashMap<>();
+        contextMap.putAll(executionContexts);
+        contextMap.putAll(finishedExecutionContexts);
+        try {
+            IMap<TaskLocation, MetricsContext> map =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+            contextMap.forEach((taskGroupLocation, taskGroupContext) -> {
+                taskGroupContext.getTaskGroup().getTasks().forEach(task -> {
+                    // MetricsContext only exists in SeaTunnelTask
+                    if (task instanceof SeaTunnelTask) {
+                        SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task;
+                        if (null != seaTunnelTask.getMetricsContext()) {
+                            map.put(seaTunnelTask.getTaskLocation(), 
seaTunnelTask.getMetricsContext());
+                        }
+                    }
+                });
+            });
+        } catch (WrongTargetException e){
+            logger.warning("The Imap acquisition failed due to the hazelcast 
node being offline or restarted, and will be retried next time", e);
+        }
+    }
+
     private final class BlockingWorker implements Runnable {
 
         private final TaskTracker tracker;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index a31766b0c..4bee24c65 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -232,7 +232,7 @@ public class CheckpointManager {
         coordinator.acknowledgeTask(ackOperation);
     }
 
-    private boolean isSavePointEnd() {
+    public boolean isSavePointEnd() {
         return 
coordinatorMap.values().stream().map(CheckpointCoordinator::isEndOfSavePoint)
             .reduce((v1, v2) -> v1 && v2).orElse(false);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index b64a79022..dee54f555 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -170,7 +170,7 @@ public class PhysicalPlan {
                     }
                     LOGGER.severe("Pipeline Failed, Begin to cancel other 
pipelines in this job.");
                 }
-                subPlanDone(subPlan);
+                subPlanDone(subPlan, pipelineState.getPipelineStatus());
 
                 if (finishedPipelineNum.incrementAndGet() == 
this.pipelineList.size()) {
                     if (failedPipelineNum.get() > 0) {
@@ -189,8 +189,9 @@ public class PhysicalPlan {
         });
     }
 
-    private void subPlanDone(SubPlan subPlan) {
+    private void subPlanDone(SubPlan subPlan, PipelineStatus pipelineStatus) {
         jobMaster.savePipelineMetricsToHistory(subPlan.getPipelineLocation());
+        jobMaster.removeMetricsContext(subPlan.getPipelineLocation(), 
pipelineStatus);
         jobMaster.releasePipelineResource(subPlan);
         notifyCheckpointManagerPipelineEnd(subPlan);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 4ba8d075f..41bcbbbbc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -17,10 +17,13 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -47,6 +50,12 @@ public class TaskExecutionContext {
         return nodeEngine.getLogger(task.getClass());
     }
 
+    public MetricsContext getOrCreateMetricsContext(TaskLocation taskLocation) 
{
+        IMap<TaskLocation, MetricsContext> map =
+            
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+        return map.computeIfAbsent(taskLocation, k -> new MetricsContext());
+    }
+
     public <T> T getTask() {
         return (T) task;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index f8008fd5b..0e967a2bc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
@@ -47,7 +48,9 @@ import 
org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -343,6 +346,16 @@ public class JobMaster {
         this.cleanTaskGroupContext(pipelineLocation);
     }
 
+    public void removeMetricsContext(PipelineLocation pipelineLocation, 
PipelineStatus pipelineStatus){
+        if (pipelineStatus.equals(PipelineStatus.FINISHED) && 
!checkpointManager.isSavePointEnd() || 
pipelineStatus.equals(PipelineStatus.CANCELED)){
+            IMap<TaskLocation, MetricsContext> map =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+            map.keySet().stream().filter(taskLocation -> {
+                return 
taskLocation.getTaskGroupLocation().getPipelineLocation().equals(pipelineLocation);
+            }).forEach(map::remove);
+        }
+    }
+
     private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
         
ownedSlotProfilesIMap.get(pipelineLocation).forEach((taskGroupLocation, 
slotProfile) -> {
             try {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
index be8480229..1f72ef948 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
@@ -27,12 +27,13 @@ import 
com.hazelcast.internal.metrics.MetricsCollectionContext;
 import com.hazelcast.internal.metrics.ProbeLevel;
 import com.hazelcast.internal.metrics.ProbeUnit;
 
+import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.BiFunction;
 
-public class MetricsContext implements DynamicMetricsProvider {
+public class MetricsContext implements DynamicMetricsProvider, Serializable {
 
     private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_SINGLE_WRITER_METRIC = SingleWriterMetric::new;
     private static final BiFunction<String, Unit, AbstractMetric> 
CREATE_THREAD_SAFE_METRICS = ThreadSafeMetric::new;
@@ -94,7 +95,7 @@ public class MetricsContext implements DynamicMetricsProvider 
{
         return ProbeUnit.valueOf(unit.name());
     }
 
-    private abstract static class AbstractMetric implements Metric {
+    private abstract static class AbstractMetric implements Metric, 
Serializable {
 
         private final String name;
         private final Unit unit;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index cf8ac987e..5c434acbb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -118,7 +118,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
     @Override
     public void init() throws Exception {
         super.init();
-        metricsContext = new MetricsContext();
+        metricsContext = 
getExecutionContext().getOrCreateMetricsContext(taskLocation);
         this.currState = SeaTunnelTaskState.INIT;
         flowFutures = new ArrayList<>();
         allCycles = new ArrayList<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index f9130e747..1d66ab6e5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -30,7 +30,9 @@ import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.CoordinatorService;
 import org.apache.seatunnel.engine.server.TestUtils;
 
 import com.hazelcast.internal.serialization.Data;
@@ -47,14 +49,14 @@ import java.util.concurrent.TimeUnit;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 class JobMetricsTest extends AbstractSeaTunnelServerTest {
 
-    private static final Long JOB_1 = 1L;
-    private static final Long JOB_2 = 2L;
-    private static final Long JOB_3 = 3L;
+    private static final Long JOB_1 = 145234L;
+    private static final Long JOB_2 = 223452L;
+    private static final Long JOB_3 = 323475L;
 
     @Test
     public void testGetJobMetrics() throws Exception {
-        startJob(JOB_1, "fake_to_console_job_metrics.conf");
-        startJob(JOB_2, "fake_to_console_job_metrics.conf");
+        startJob(JOB_1, "fake_to_console_job_metrics.conf", false);
+        startJob(JOB_2, "fake_to_console_job_metrics.conf", false);
 
         await().atMost(60000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> {
@@ -80,11 +82,46 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
         assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0);
     }
 
-    private void startJob(Long jobid, String path){
+    @Test
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void testMetricsOnJobRestart() throws InterruptedException {
+        CoordinatorService coordinatorService = server.getCoordinatorService();
+        startJob(JOB_3, "stream_fake_to_console.conf", false);
+        // waiting for job status turn to running
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, 
server.getCoordinatorService().getJobStatus(JOB_3)));
+
+        Thread.sleep(10000);
+
+        
System.out.println(coordinatorService.getJobMetrics(JOB_3).toJsonString());
+
+        //start savePoint
+        coordinatorService.savePoint(JOB_3);
+
+        //waiting job FINISHED
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals(JobStatus.FINISHED, 
server.getCoordinatorService().getJobStatus(JOB_3)));
+
+        //restore job
+        startJob(JOB_3, "stream_fake_to_console.conf", true);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, 
server.getCoordinatorService().getJobStatus(JOB_3)));
+
+        Thread.sleep(20000);
+        //check metrics
+        JobMetrics jobMetrics = coordinatorService.getJobMetrics(JOB_3);
+        System.out.println(jobMetrics.toJsonString());
+        assertTrue(80 <  (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
+        assertTrue(80 <  (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
+        assertTrue(80 <  (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
+        assertTrue(80 <  (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
+    }
+
+    private void startJob(Long jobid, String path, boolean 
isStartWithSavePoint){
         LogicalDag testLogicalDag =
             TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
 
-        JobImmutableInformation jobImmutableInformation = new 
JobImmutableInformation(jobid,
+        JobImmutableInformation jobImmutableInformation = new 
JobImmutableInformation(jobid, isStartWithSavePoint,
             nodeEngine.getSerializationService().toData(testLogicalDag), 
testLogicalDag.getJobConfig(),
             Collections.emptyList());
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console.conf
new file mode 100644
index 000000000..ef88cc717
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      result_table_name = "fake1"
+       row.num = 100
+       split.num = 5
+       split.read-interval = 3000
+       parallelism = 1
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+
+    FakeSource {
+      result_table_name = "fake2"
+       row.num = 100
+       split.num = 5
+       split.read-interval = 3000
+       parallelism = 1
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
+
+transform {
+}
+
+sink {
+      console {
+      source_table_name = "fake1"
+      }
+    console {
+    source_table_name = "fake2"
+    }
+}
\ No newline at end of file

Reply via email to