This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2e9a1bdb1 [Improve][Zeta] Reduce the operation count of
imap_running_job_metrics (#4861)
2e9a1bdb1 is described below
commit 2e9a1bdb12094edf3a39a960fb785138ac5b2f17
Author: ic4y <[email protected]>
AuthorDate: Mon Jun 5 16:51:57 2023 +0800
[Improve][Zeta] Reduce the operation count of imap_running_job_metrics
(#4861)
---
.../apache/seatunnel/engine/common/Constant.java | 2 +
.../engine/server/CoordinatorService.java | 8 +++
.../engine/server/TaskExecutionService.java | 77 ++++++++++++----------
.../server/execution/TaskExecutionContext.java | 10 ++-
.../seatunnel/engine/server/master/JobMaster.java | 37 +++++++----
5 files changed, 86 insertions(+), 48 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 12e13bc19..8d52a444a 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -56,4 +56,6 @@ public class Constant {
public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map";
public static final String IMAP_RUNNING_JOB_METRICS =
"engine_runningJobMetrics";
+
+ public static final Long IMAP_RUNNING_JOB_METRICS_KEY = 1L;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d38f2e028..bf913a424 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -37,9 +37,11 @@ import
org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -53,6 +55,7 @@ import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -126,6 +129,8 @@ public class CoordinatorService {
*/
private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>>
ownedSlotProfilesIMap;
+ private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>>
metricsImap;
+
/** If this node is a master node */
private volatile boolean isActive = false;
@@ -191,6 +196,7 @@ public class CoordinatorService {
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
+ metricsImap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
jobHistoryService =
new JobHistoryService(
@@ -256,6 +262,7 @@ public class CoordinatorService {
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
+ metricsImap,
engineConfig);
// If Job Status is CANCELLING , set needRestore to false
@@ -419,6 +426,7 @@ public class CoordinatorService {
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
+ metricsImap,
engineConfig);
executorService.submit(
() -> {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 2fe8ee485..dd92e6104 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -134,8 +134,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
private final ScheduledExecutorService scheduledExecutorService;
- private CountDownLatch waitClusterStarted;
-
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties
properties) {
seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -493,7 +491,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
task.provideDynamicMetrics(copy3,
context);
});
});
- updateMetricsContextInImap();
} catch (Throwable t) {
logger.warning("Dynamic metric collection failed", t);
throw t;
@@ -501,43 +498,53 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
private synchronized void updateMetricsContextInImap() {
+ if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ logger.warning(
+ String.format(
+ "The Node is not ready yet, Node state %s,looking
forward to the next "
+ + "scheduling",
+ nodeEngine.getNode().getState()));
+ return;
+ }
+ IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
Map<TaskGroupLocation, TaskGroupContext> contextMap = new HashMap<>();
contextMap.putAll(finishedExecutionContexts);
contextMap.putAll(executionContexts);
- try {
- if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ HashMap<TaskLocation, SeaTunnelMetricsContext> localMap = new
HashMap<>();
+ contextMap.forEach(
+ (taskGroupLocation, taskGroupContext) -> {
+ taskGroupContext
+ .getTaskGroup()
+ .getTasks()
+ .forEach(
+ task -> {
+ // MetricsContext only exists in
SeaTunnelTask
+ if (task instanceof SeaTunnelTask) {
+ SeaTunnelTask seaTunnelTask =
(SeaTunnelTask) task;
+ if (null !=
seaTunnelTask.getMetricsContext()) {
+ localMap.put(
+
seaTunnelTask.getTaskLocation(),
+
seaTunnelTask.getMetricsContext());
+ }
+ }
+ });
+ });
+ if (localMap.size() > 0) {
+ try {
+ metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+ metricsImap.computeIfAbsent(
+ Constant.IMAP_RUNNING_JOB_METRICS_KEY, k ->
new HashMap<>());
+ centralMap.putAll(localMap);
+ metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY,
centralMap);
+ } catch (Exception e) {
logger.warning(
- String.format(
- "The Node is not ready yet, Node state
%s,looking forward to the next "
- + "scheduling",
- nodeEngine.getNode().getState()));
- return;
+ "The Imap acquisition failed due to the hazelcast node
being offline or restarted, and will be retried next time",
+ e);
+ } finally {
+ metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
}
-
- IMap<TaskLocation, SeaTunnelMetricsContext> map =
-
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- contextMap.forEach(
- (taskGroupLocation, taskGroupContext) -> {
- taskGroupContext
- .getTaskGroup()
- .getTasks()
- .forEach(
- task -> {
- // MetricsContext only exists in
SeaTunnelTask
- if (task instanceof SeaTunnelTask)
{
- SeaTunnelTask seaTunnelTask =
(SeaTunnelTask) task;
- if (null !=
seaTunnelTask.getMetricsContext()) {
- map.put(
-
seaTunnelTask.getTaskLocation(),
-
seaTunnelTask.getMetricsContext());
- }
- }
- });
- });
- } catch (Exception e) {
- logger.warning(
- "The Imap acquisition failed due to the hazelcast node
being offline or restarted, and will be retried next time",
- e);
}
this.printTaskExecutionRuntimeInfo();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 55249babc..4c9a48a71 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -30,6 +30,8 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+import java.util.HashMap;
+
public class TaskExecutionContext {
private final Task task;
@@ -56,9 +58,13 @@ public class TaskExecutionContext {
}
public SeaTunnelMetricsContext getOrCreateMetricsContext(TaskLocation
taskLocation) {
- IMap<TaskLocation, SeaTunnelMetricsContext> map =
+ IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- return map.computeIfAbsent(taskLocation, k -> new
SeaTunnelMetricsContext());
+ HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+ map.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ return centralMap == null || centralMap.get(taskLocation) == null
+ ? new SeaTunnelMetricsContext()
+ : centralMap.get(taskLocation);
}
public <T> T getTask() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 876ac80cc..13b89a69d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -81,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
@@ -139,6 +140,8 @@ public class JobMaster {
private final IMap<Long, JobInfo> runningJobInfoIMap;
+ private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>>
metricsImap;
+
/** If the job or pipeline cancel by user, needRestore will be false */
@Getter private volatile boolean needRestore = true;
@@ -154,6 +157,7 @@ public class JobMaster {
@NonNull IMap runningJobStateTimestampsIMap,
@NonNull IMap ownedSlotProfilesIMap,
@NonNull IMap<Long, JobInfo> runningJobInfoIMap,
+ @NonNull IMap<Long, HashMap<TaskLocation,
SeaTunnelMetricsContext>> metricsImap,
EngineConfig engineConfig) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
@@ -169,6 +173,7 @@ public class JobMaster {
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.runningJobInfoIMap = runningJobInfoIMap;
this.engineConfig = engineConfig;
+ this.metricsImap = metricsImap;
}
public void init(long initializationTimestamp, boolean restart, boolean
canRestoreAgain)
@@ -546,17 +551,27 @@ public class JobMaster {
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
if (pipelineStatus.equals(PipelineStatus.FINISHED) &&
!checkpointManager.isSavePointEnd()
|| pipelineStatus.equals(PipelineStatus.CANCELED)) {
- IMap<TaskLocation, SeaTunnelMetricsContext> map =
-
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- map.keySet().stream()
- .filter(
- taskLocation -> {
- return taskLocation
- .getTaskGroupLocation()
- .getPipelineLocation()
- .equals(pipelineLocation);
- })
- .forEach(map::remove);
+ try {
+ metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+ metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ if (centralMap != null) {
+ List<TaskLocation> collect =
+ centralMap.keySet().stream()
+ .filter(
+ taskLocation -> {
+ return taskLocation
+ .getTaskGroupLocation()
+ .getPipelineLocation()
+
.equals(pipelineLocation);
+ })
+ .collect(Collectors.toList());
+ collect.forEach(centralMap::remove);
+ metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY,
centralMap);
+ }
+ } finally {
+ metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ }
}
}