This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3017ea540 [bugfix][zeta] fix metrics lose on pipeline restart (#3977)
3017ea540 is described below
commit 3017ea54011b5f5201bb9392e4f965483ccb537c
Author: ic4y <[email protected]>
AuthorDate: Thu Feb 2 18:11:56 2023 +0800
[bugfix][zeta] fix metrics lose on pipeline restart (#3977)
* [bugfix][zeta] fix metrics lose on pipeline restart
---
release-note.md | 2 +-
.../seatunnel/api/common/metrics/JobMetrics.java | 9 ++-
.../seatunnel/api/common/metrics/Measurement.java | 4 ++
.../src/test/resources/junit-platform.properties | 19 ++++++
.../apache/seatunnel/engine/common/Constant.java | 2 +
.../engine/common/config/EngineConfig.java | 9 ++-
.../config/YamlSeaTunnelDomConfigProcessor.java | 3 +
.../common/config/server/ServerConfigOptions.java | 2 +
.../engine/server/CoordinatorService.java | 3 +-
.../engine/server/TaskExecutionService.java | 41 +++++++++++++
.../server/checkpoint/CheckpointManager.java | 2 +-
.../engine/server/dag/physical/PhysicalPlan.java | 5 +-
.../server/execution/TaskExecutionContext.java | 9 +++
.../seatunnel/engine/server/master/JobMaster.java | 13 ++++
.../engine/server/metrics/MetricsContext.java | 5 +-
.../engine/server/task/SeaTunnelTask.java | 2 +-
.../engine/server/master/JobMetricsTest.java | 51 +++++++++++++---
.../src/test/resources/stream_fake_to_console.conf | 71 ++++++++++++++++++++++
18 files changed, 234 insertions(+), 18 deletions(-)
diff --git a/release-note.md b/release-note.md
index 9d4f6a64c..f72c8ee69 100644
--- a/release-note.md
+++ b/release-note.md
@@ -53,7 +53,7 @@
- [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808
- [Checkpoint] Add savepoint and restore with savepoint #3930
- [Core]Fix Local Mode can't deserialize split (#3817)
-- [Metrics] Fix Metrics will lose when Job be canceled. #3797
+- [Metrics] Fix Metrics will lose when Job be canceled or restart. #3797 #3977
### Documents
- [Doc] seatunnel run with flink operator error #3998
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
index a0f4f7a82..78e7d2d55 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -78,8 +78,13 @@ public final class JobMetrics implements Serializable {
Map<String, List<Measurement>> metricsMap = new HashMap<>();
metrics.forEach((key, value) -> metricsMap.put(key, new
ArrayList<>(value)));
jobMetrics.metrics.forEach((key, value) -> metricsMap.merge(key,
value, (v1, v2) -> {
- v1.addAll(v2);
- return v1;
+ List<Measurement> ms = new ArrayList<>(v2);
+ for (Measurement m1 : v1) {
+ if (v2.stream().noneMatch(m2 ->
m2.getTags().equals(m1.getTags()))) {
+ ms.add(m1);
+ }
+ }
+ return ms;
}));
return new JobMetrics(metricsMap);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
index 6ea645955..58d521652 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/Measurement.java
@@ -109,6 +109,10 @@ public final class Measurement implements Serializable {
return tags.get(name);
}
+ public Map<String, String> getTags() {
+ return tags;
+ }
+
@Override
public int hashCode() {
return 31 * (int) (timestamp * 31 + value.hashCode()) +
Objects.hashCode(tags);
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/junit-platform.properties
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/junit-platform.properties
new file mode 100644
index 000000000..1b9e4750c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/junit-platform.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+junit.jupiter.execution.parallel.mode.default = same_thread
+junit.jupiter.execution.parallel.mode.classes.default = same_thread
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 50e70f426..0fb820cba 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -57,4 +57,6 @@ public class Constant {
public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d";
public static final String IMAP_RESOURCE_MANAGER_REGISTER_WORKER =
"ResourceManager_RegisterWorker";
+
+ public static final String IMAP_RUNNING_JOB_METRICS =
"engine_runningJobMetrics";
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index f4d1d066c..a51b8a128 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -36,6 +36,8 @@ public class EngineConfig {
private int printJobMetricsInfoInterval =
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue();
+ private int jobMetricsBackupInterval =
ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.defaultValue();
+
private SlotServiceConfig slotServiceConfig =
ServerConfigOptions.SLOT_SERVICE.defaultValue();
private CheckpointConfig checkpointConfig =
ServerConfigOptions.CHECKPOINT.defaultValue();
@@ -53,10 +55,15 @@ public class EngineConfig {
}
public void setPrintJobMetricsInfoInterval(int
printJobMetricsInfoInterval) {
- checkPositive(printExecutionInfoInterval,
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
+ checkPositive(printJobMetricsInfoInterval,
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
}
+ public void setJobMetricsBackupInterval(int jobMetricsBackupInterval) {
+ checkPositive(jobMetricsBackupInterval,
ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL + " must be > 0");
+ this.jobMetricsBackupInterval = jobMetricsBackupInterval;
+ }
+
public void setQueueType(QueueType queueType) {
checkNotNull(queueType);
this.queueType = queueType;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 346b377e7..e7b4fb907 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -105,6 +105,9 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
} else if
(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) {
engineConfig.setPrintJobMetricsInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(),
getTextContent(node)));
+ } else if
(ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key().equals(name)) {
+
engineConfig.setJobMetricsBackupInterval(getIntegerValue(ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key(),
+ getTextContent(node)));
} else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
} else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 61c1e5073..7f0d30d17 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -33,6 +33,8 @@ public class ServerConfigOptions {
public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL =
Options.key("print-job-metrics-info-interval").intType().defaultValue(60).withDescription("The
interval (in seconds) of job print metrics info");
+ public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL =
Options.key("job-metrics-backup-interval").intType().defaultValue(3).withDescription("The
interval (in seconds) of job metrics backups");
+
public static final Option<Boolean> DYNAMIC_SLOT =
Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether
to use dynamic slot.");
public static final Option<Integer> SLOT_NUM =
Options.key("slot-num").intType().defaultValue(2).withDescription("The number
of slots. Only valid when dynamic slot is disabled.");
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index dd053d2b9..f5e502201 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -430,7 +430,8 @@ public class CoordinatorService {
public JobStatus getJobStatus(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
- return jobHistoryService.getJobDetailState(jobId).getJobStatus();
+ JobHistoryService.JobStateData jobDetailState =
jobHistoryService.getJobDetailState(jobId);
+ return null == jobDetailState ? null :
jobDetailState.getJobStatus();
}
return runningJobMaster.getJobStatus();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 0f65aea28..688a38c69 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -32,6 +32,9 @@ import static java.util.stream.Collectors.toList;
import org.apache.seatunnel.api.common.metrics.MetricTags;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -45,6 +48,8 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
@@ -56,6 +61,8 @@ import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.logging.ILogger;
+import com.hazelcast.map.IMap;
+import com.hazelcast.spi.exception.WrongTargetException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import com.hazelcast.spi.properties.HazelcastProperties;
@@ -76,9 +83,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -101,8 +111,12 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
private final ConcurrentMap<TaskGroupLocation, TaskGroupContext>
finishedExecutionContexts = new ConcurrentHashMap<>();
private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>>
cancellationFutures =
new ConcurrentHashMap<>();
+ private final SeaTunnelConfig seaTunnelConfig;
+
+ private final ScheduledExecutorService scheduledExecutorService;
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties
properties) {
+ seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
this.nodeEngine = nodeEngine;
this.logger =
nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
@@ -111,6 +125,8 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
MetricDescriptor descriptor = registry.newMetricDescriptor()
.withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
registry.registerStaticMetrics(descriptor, this);
+ scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+
scheduledExecutorService.scheduleAtFixedRate(this::updateMetricsContextInImap,
0, seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
TimeUnit.SECONDS);
}
public void start() {
@@ -120,6 +136,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
public void shutdown() {
isRunning = false;
executorService.shutdownNow();
+ scheduledExecutorService.shutdown();
}
public TaskGroupContext getExecutionContext(TaskGroupLocation
taskGroupLocation) {
@@ -328,12 +345,36 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
task.provideDynamicMetrics(copy3, context);
});
});
+ updateMetricsContextInImap();
} catch (Throwable t) {
logger.warning("Dynamic metric collection failed", t);
throw t;
}
}
+ private synchronized void updateMetricsContextInImap() {
+ Map<TaskGroupLocation, TaskGroupContext> contextMap = new HashMap<>();
+ contextMap.putAll(executionContexts);
+ contextMap.putAll(finishedExecutionContexts);
+ try {
+ IMap<TaskLocation, MetricsContext> map =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ contextMap.forEach((taskGroupLocation, taskGroupContext) -> {
+ taskGroupContext.getTaskGroup().getTasks().forEach(task -> {
+ // MetricsContext only exists in SeaTunnelTask
+ if (task instanceof SeaTunnelTask) {
+ SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task;
+ if (null != seaTunnelTask.getMetricsContext()) {
+ map.put(seaTunnelTask.getTaskLocation(),
seaTunnelTask.getMetricsContext());
+ }
+ }
+ });
+ });
+ } catch (WrongTargetException e){
+ logger.warning("The Imap acquisition failed due to the hazelcast
node being offline or restarted, and will be retried next time", e);
+ }
+ }
+
private final class BlockingWorker implements Runnable {
private final TaskTracker tracker;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index a31766b0c..4bee24c65 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -232,7 +232,7 @@ public class CheckpointManager {
coordinator.acknowledgeTask(ackOperation);
}
- private boolean isSavePointEnd() {
+ public boolean isSavePointEnd() {
return
coordinatorMap.values().stream().map(CheckpointCoordinator::isEndOfSavePoint)
.reduce((v1, v2) -> v1 && v2).orElse(false);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index b64a79022..dee54f555 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -170,7 +170,7 @@ public class PhysicalPlan {
}
LOGGER.severe("Pipeline Failed, Begin to cancel other
pipelines in this job.");
}
- subPlanDone(subPlan);
+ subPlanDone(subPlan, pipelineState.getPipelineStatus());
if (finishedPipelineNum.incrementAndGet() ==
this.pipelineList.size()) {
if (failedPipelineNum.get() > 0) {
@@ -189,8 +189,9 @@ public class PhysicalPlan {
});
}
- private void subPlanDone(SubPlan subPlan) {
+ private void subPlanDone(SubPlan subPlan, PipelineStatus pipelineStatus) {
jobMaster.savePipelineMetricsToHistory(subPlan.getPipelineLocation());
+ jobMaster.removeMetricsContext(subPlan.getPipelineLocation(),
pipelineStatus);
jobMaster.releasePipelineResource(subPlan);
notifyCheckpointManagerPipelineEnd(subPlan);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 4ba8d075f..41bcbbbbc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -17,10 +17,13 @@
package org.apache.seatunnel.engine.server.execution;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -47,6 +50,12 @@ public class TaskExecutionContext {
return nodeEngine.getLogger(task.getClass());
}
+ public MetricsContext getOrCreateMetricsContext(TaskLocation taskLocation)
{
+ IMap<TaskLocation, MetricsContext> map =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ return map.computeIfAbsent(taskLocation, k -> new MetricsContext());
+ }
+
public <T> T getTask() {
return (T) task;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index f8008fd5b..0e967a2bc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
@@ -47,7 +48,9 @@ import
org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
+import org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -343,6 +346,16 @@ public class JobMaster {
this.cleanTaskGroupContext(pipelineLocation);
}
+ public void removeMetricsContext(PipelineLocation pipelineLocation,
PipelineStatus pipelineStatus){
+ if (pipelineStatus.equals(PipelineStatus.FINISHED) &&
!checkpointManager.isSavePointEnd() ||
pipelineStatus.equals(PipelineStatus.CANCELED)){
+ IMap<TaskLocation, MetricsContext> map =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ map.keySet().stream().filter(taskLocation -> {
+ return
taskLocation.getTaskGroupLocation().getPipelineLocation().equals(pipelineLocation);
+ }).forEach(map::remove);
+ }
+ }
+
private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
ownedSlotProfilesIMap.get(pipelineLocation).forEach((taskGroupLocation,
slotProfile) -> {
try {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
index be8480229..1f72ef948 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/MetricsContext.java
@@ -27,12 +27,13 @@ import
com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
+import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;
-public class MetricsContext implements DynamicMetricsProvider {
+public class MetricsContext implements DynamicMetricsProvider, Serializable {
private static final BiFunction<String, Unit, AbstractMetric>
CREATE_SINGLE_WRITER_METRIC = SingleWriterMetric::new;
private static final BiFunction<String, Unit, AbstractMetric>
CREATE_THREAD_SAFE_METRICS = ThreadSafeMetric::new;
@@ -94,7 +95,7 @@ public class MetricsContext implements DynamicMetricsProvider
{
return ProbeUnit.valueOf(unit.name());
}
- private abstract static class AbstractMetric implements Metric {
+ private abstract static class AbstractMetric implements Metric,
Serializable {
private final String name;
private final Unit unit;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index cf8ac987e..5c434acbb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -118,7 +118,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public void init() throws Exception {
super.init();
- metricsContext = new MetricsContext();
+ metricsContext =
getExecutionContext().getOrCreateMetricsContext(taskLocation);
this.currState = SeaTunnelTaskState.INIT;
flowFutures = new ArrayList<>();
allCycles = new ArrayList<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index f9130e747..1d66ab6e5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -30,7 +30,9 @@ import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.TestUtils;
import com.hazelcast.internal.serialization.Data;
@@ -47,14 +49,14 @@ import java.util.concurrent.TimeUnit;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class JobMetricsTest extends AbstractSeaTunnelServerTest {
- private static final Long JOB_1 = 1L;
- private static final Long JOB_2 = 2L;
- private static final Long JOB_3 = 3L;
+ private static final Long JOB_1 = 145234L;
+ private static final Long JOB_2 = 223452L;
+ private static final Long JOB_3 = 323475L;
@Test
public void testGetJobMetrics() throws Exception {
- startJob(JOB_1, "fake_to_console_job_metrics.conf");
- startJob(JOB_2, "fake_to_console_job_metrics.conf");
+ startJob(JOB_1, "fake_to_console_job_metrics.conf", false);
+ startJob(JOB_2, "fake_to_console_job_metrics.conf", false);
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
@@ -80,11 +82,46 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0);
}
- private void startJob(Long jobid, String path){
+ @Test
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public void testMetricsOnJobRestart() throws InterruptedException {
+ CoordinatorService coordinatorService = server.getCoordinatorService();
+ startJob(JOB_3, "stream_fake_to_console.conf", false);
+ // waiting for job status turn to running
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING,
server.getCoordinatorService().getJobStatus(JOB_3)));
+
+ Thread.sleep(10000);
+
+
System.out.println(coordinatorService.getJobMetrics(JOB_3).toJsonString());
+
+ //start savePoint
+ coordinatorService.savePoint(JOB_3);
+
+ //waiting job FINISHED
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(JobStatus.FINISHED,
server.getCoordinatorService().getJobStatus(JOB_3)));
+
+ //restore job
+ startJob(JOB_3, "stream_fake_to_console.conf", true);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING,
server.getCoordinatorService().getJobStatus(JOB_3)));
+
+ Thread.sleep(20000);
+ //check metrics
+ JobMetrics jobMetrics = coordinatorService.getJobMetrics(JOB_3);
+ System.out.println(jobMetrics.toJsonString());
+ assertTrue(80 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
+ assertTrue(80 < (Long)
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
+ assertTrue(80 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
+ assertTrue(80 < (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(1).value());
+ }
+
+ private void startJob(Long jobid, String path, boolean
isStartWithSavePoint){
LogicalDag testLogicalDag =
TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
- JobImmutableInformation jobImmutableInformation = new
JobImmutableInformation(jobid,
+ JobImmutableInformation jobImmutableInformation = new
JobImmutableInformation(jobid, isStartWithSavePoint,
nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console.conf
new file mode 100644
index 000000000..ef88cc717
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake1"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+
+ FakeSource {
+ result_table_name = "fake2"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name = "fake1"
+ }
+ console {
+ source_table_name = "fake2"
+ }
+}
\ No newline at end of file