This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7385f4d6ae [Fix][Zeta] Fix memory leak caused by failed pipeline
cleanup after unstable worker communication (#10418)
7385f4d6ae is described below
commit 7385f4d6ae43e38835cc53f78dc05c6f112cddfa
Author: Jast <[email protected]>
AuthorDate: Wed Mar 18 23:27:07 2026 +0800
[Fix][Zeta] Fix memory leak caused by failed pipeline cleanup after
unstable worker communication (#10418)
---
.../apache/seatunnel/engine/common/Constant.java | 2 +
.../engine/server/CoordinatorService.java | 190 ++++++++++++++-
.../engine/server/dag/physical/SubPlan.java | 2 +
.../seatunnel/engine/server/master/JobMaster.java | 66 +++++
.../master/cleanup/PipelineCleanupRecord.java | 174 +++++++++++++
.../serializable/ResourceDataSerializerHook.java | 5 +
.../CoordinatorServicePipelineCleanupTest.java | 270 +++++++++++++++++++++
...ineCleanupRecordHazelcastSerializationTest.java | 101 ++++++++
.../master/cleanup/PipelineCleanupRecordTest.java | 147 +++++++++++
9 files changed, 956 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index eadc73005f..da8c8abd09 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -59,6 +59,8 @@ public class Constant {
public static final String IMAP_RUNNING_JOB_METRICS =
"engine_runningJobMetrics";
+ public static final String IMAP_PENDING_PIPELINE_CLEANUP =
"engine_pendingPipelineCleanup";
+
public static final String IMAP_CHECKPOINT_MONITOR =
"engine_checkpoint_monitor";
public static final String IMAP_CONNECTOR_JAR_REF_COUNTERS =
"engine_connectorJarRefCounters";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 77c281a168..f4273ed497 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -63,6 +63,7 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import
org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
@@ -70,6 +71,7 @@ import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
+import
org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation;
import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import org.apache.seatunnel.engine.server.telemetry.metrics.entity.JobCounter;
import
org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus;
@@ -110,6 +112,7 @@ import java.util.stream.Collectors;
import static
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;
public class CoordinatorService {
+ private static final int PIPELINE_CLEANUP_INTERVAL_SECONDS = 60;
private final NodeEngineImpl nodeEngine;
private final ILogger logger;
@@ -175,6 +178,8 @@ public class CoordinatorService {
private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>>
metricsImap;
+ private IMap<PipelineLocation, PipelineCleanupRecord>
pendingPipelineCleanupIMap;
+
/** If this node is a master node */
private volatile boolean isActive = false;
@@ -184,6 +189,8 @@ public class CoordinatorService {
private final ScheduledExecutorService masterActiveListener;
+ private final ScheduledExecutorService pipelineCleanupScheduler;
+
private final EngineConfig engineConfig;
private ConnectorPackageService connectorPackageService;
@@ -219,6 +226,16 @@ public class CoordinatorService {
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+ pipelineCleanupScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("seatunnel-pipeline-cleanup-%d")
+ .build());
+ pipelineCleanupScheduler.scheduleAtFixedRate(
+ this::cleanupPendingPipelines,
+ PIPELINE_CLEANUP_INTERVAL_SECONDS,
+ PIPELINE_CLEANUP_INTERVAL_SECONDS,
+ TimeUnit.SECONDS);
scheduleStrategy = engineConfig.getScheduleStrategy();
isWaitStrategy = scheduleStrategy.equals(ScheduleStrategy.WAIT);
logger.info("Start pending job schedule thread");
@@ -413,7 +430,8 @@ public class CoordinatorService {
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
metricsImap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
-
+ pendingPipelineCleanupIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
jobHistoryService =
new JobHistoryService(
nodeEngine,
@@ -448,6 +466,168 @@ public class CoordinatorService {
this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
}
+ private void cleanupPendingPipelines() {
+ if (!isActive) {
+ return;
+ }
+ IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+ this.pendingPipelineCleanupIMap;
+ if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
+ return;
+ }
+
+ try {
+ for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
+ pendingCleanupIMap.entrySet()) {
+ processPendingPipelineCleanup(entry.getKey(),
entry.getValue());
+ }
+ } catch (HazelcastInstanceNotActiveException e) {
+ logger.warning(
+ String.format(
+ "Skip pending pipeline cleanup: hazelcast not
active: %s",
+ ExceptionUtils.getMessage(e)));
+ } catch (Throwable t) {
+ logger.warning(
+ String.format(
+ "Unexpected exception in pending pipeline cleanup:
%s",
+ ExceptionUtils.getMessage(t)),
+ t);
+ }
+ }
+
+ private void processPendingPipelineCleanup(
+ PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
+ if (pipelineLocation == null || record == null) {
+ return;
+ }
+ if (!shouldCleanup(record)) {
+ removePendingCleanupRecord(pipelineLocation, record);
+ return;
+ }
+
+ PipelineStatus currentStatus =
getPipelineStatusFromIMap(pipelineLocation);
+ if (currentStatus != null && !currentStatus.isEndState()) {
+ return;
+ }
+
+ long now = System.currentTimeMillis();
+ PipelineCleanupRecord updated = copy(record);
+ updated.setLastAttemptTimeMillis(now);
+ updated.setAttemptCount(record.getAttemptCount() + 1);
+
+ if (!updated.isMetricsImapCleaned() &&
cleanupPipelineMetrics(pipelineLocation)) {
+ updated.setMetricsImapCleaned(true);
+ }
+
+ Map<TaskGroupLocation, Address> taskGroups = updated.getTaskGroups();
+ if (taskGroups != null && !taskGroups.isEmpty()) {
+ for (Map.Entry<TaskGroupLocation, Address> taskGroup :
taskGroups.entrySet()) {
+ TaskGroupLocation taskGroupLocation = taskGroup.getKey();
+ if (updated.getCleanedTaskGroups() != null
+ &&
updated.getCleanedTaskGroups().contains(taskGroupLocation)) {
+ continue;
+ }
+ Address workerAddress = taskGroup.getValue();
+ if (workerAddress == null
+ ||
nodeEngine.getClusterService().getMember(workerAddress) == null) {
+ continue;
+ }
+ try {
+ NodeEngineUtil.sendOperationToMemberNode(
+ nodeEngine,
+ new
CleanTaskGroupContextOperation(taskGroupLocation),
+ workerAddress)
+ .get();
+ updated.getCleanedTaskGroups().add(taskGroupLocation);
+ } catch (HazelcastInstanceNotActiveException e) {
+ logger.warning(
+ String.format(
+ "%s clean TaskGroupContext failed: %s",
+ taskGroupLocation,
ExceptionUtils.getMessage(e)));
+ } catch (Exception e) {
+ logger.warning(
+ String.format(
+ "%s clean TaskGroupContext failed: %s",
+ taskGroupLocation,
ExceptionUtils.getMessage(e)),
+ e);
+ }
+ }
+ }
+
+ boolean replaced =
pendingPipelineCleanupIMap.replace(pipelineLocation, record, updated);
+ if (!replaced) {
+ return;
+ }
+ if (updated.isCleaned()) {
+ pendingPipelineCleanupIMap.remove(pipelineLocation, updated);
+ }
+ }
+
+ private void removePendingCleanupRecord(
+ PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
+ try {
+ pendingPipelineCleanupIMap.remove(pipelineLocation, record);
+ } catch (Exception e) {
+ logger.warning(
+ String.format(
+ "Remove pending pipeline cleanup record failed:
%s",
+ ExceptionUtils.getMessage(e)),
+ e);
+ }
+ }
+
+ private boolean shouldCleanup(PipelineCleanupRecord record) {
+ if (record == null || record.getFinalStatus() == null) {
+ return false;
+ }
+ if (record.isSavepointEnd()) {
+ return false;
+ }
+ return PipelineStatus.CANCELED.equals(record.getFinalStatus())
+ || PipelineStatus.FINISHED.equals(record.getFinalStatus());
+ }
+
+ private PipelineStatus getPipelineStatusFromIMap(PipelineLocation
pipelineLocation) {
+ Object state =
+ runningJobStateIMap != null ?
runningJobStateIMap.get(pipelineLocation) : null;
+ return state instanceof PipelineStatus ? (PipelineStatus) state : null;
+ }
+
+ private PipelineCleanupRecord copy(PipelineCleanupRecord record) {
+ Map<TaskGroupLocation, Address> taskGroups =
+ record.getTaskGroups() == null
+ ? Collections.emptyMap()
+ : new HashMap<>(record.getTaskGroups());
+ Set<TaskGroupLocation> cleanedTaskGroups =
+ record.getCleanedTaskGroups() == null
+ ? new HashSet<>()
+ : new HashSet<>(record.getCleanedTaskGroups());
+ return new PipelineCleanupRecord(
+ record.getPipelineLocation(),
+ record.getFinalStatus(),
+ record.isSavepointEnd(),
+ taskGroups,
+ cleanedTaskGroups,
+ record.isMetricsImapCleaned(),
+ record.getCreateTimeMillis(),
+ record.getLastAttemptTimeMillis(),
+ record.getAttemptCount());
+ }
+
+ private boolean cleanupPipelineMetrics(PipelineLocation pipelineLocation) {
+ try {
+ seaTunnelServer.removeMetrics(pipelineLocation);
+ return true;
+ } catch (Exception e) {
+ logger.warning(
+ String.format(
+ "Failed to remove metrics context for pipeline %s:
%s",
+ pipelineLocation, ExceptionUtils.getMessage(e)),
+ e);
+ return false;
+ }
+ }
+
private void restoreAllRunningJobFromMasterNodeSwitch() {
List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs;
try {
@@ -960,6 +1140,9 @@ public class CoordinatorService {
if (masterActiveListener != null) {
masterActiveListener.shutdownNow();
}
+ if (pipelineCleanupScheduler != null) {
+ pipelineCleanupScheduler.shutdownNow();
+ }
clearCoordinatorService();
}
@@ -1224,6 +1407,11 @@ public class CoordinatorService {
return metricsImap;
}
+ @VisibleForTesting
+ void runPendingPipelineCleanupOnce() {
+ cleanupPendingPipelines();
+ }
+
@VisibleForTesting
public PeekBlockingQueue<PendingJobInfo> getPendingJobQueue() {
return pendingJobQueue;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index ee54911142..5e9aa83487 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -307,6 +307,8 @@ public class SubPlan {
try {
RetryUtils.retryWithException(
() -> {
+ jobMaster.enqueuePipelineCleanupIfNeeded(
+ getPipelineLocation(), pipelineStatus);
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
try {
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index ee1edc826c..20aaf667b3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -67,6 +67,7 @@ import
org.apache.seatunnel.engine.server.dag.physical.ResourceUtils;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import
org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
@@ -896,6 +897,71 @@ public class JobMaster {
this.cleanTaskGroupContext(pipelineLocation);
}
+ public void enqueuePipelineCleanupIfNeeded(
+ PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
+ if (pipelineLocation == null || pipelineStatus == null) {
+ return;
+ }
+ boolean savepointEnd =
+ PipelineStatus.FINISHED.equals(pipelineStatus)
+ && checkpointManager != null
+ &&
checkpointManager.isPipelineSavePointEnd(pipelineLocation);
+ boolean shouldCleanup =
+ PipelineStatus.CANCELED.equals(pipelineStatus)
+ || (PipelineStatus.FINISHED.equals(pipelineStatus) &&
!savepointEnd);
+ if (!shouldCleanup) {
+ return;
+ }
+
+ Map<TaskGroupLocation, SlotProfile> slotProfileMap =
+ ownedSlotProfilesIMap.get(pipelineLocation);
+ Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+ if (slotProfileMap != null) {
+ slotProfileMap.forEach(
+ (taskGroupLocation, slotProfile) ->
+ taskGroups.put(taskGroupLocation,
slotProfile.getWorker()));
+ }
+
+ IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+ long now = System.currentTimeMillis();
+ PipelineCleanupRecord newRecord =
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ pipelineStatus,
+ savepointEnd,
+ taskGroups,
+ Collections.emptySet(),
+ false,
+ now,
+ 0,
+ 0);
+
+ while (true) {
+ PipelineCleanupRecord existing =
pendingCleanupIMap.get(pipelineLocation);
+ if (existing == null) {
+ PipelineCleanupRecord prev =
+ pendingCleanupIMap.putIfAbsent(pipelineLocation,
newRecord);
+ if (prev == null) {
+ return;
+ }
+ existing = prev;
+ }
+ PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
+ if (merged.equals(existing)) {
+ return;
+ }
+ if (pendingCleanupIMap.replace(pipelineLocation, existing,
merged)) {
+ return;
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
public void removeMetricsContext(
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
if ((pipelineStatus.equals(PipelineStatus.FINISHED)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecord.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecord.java
new file mode 100644
index 0000000000..6ace1cbb0b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecord.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master.cleanup;
+
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PipelineCleanupRecord implements IdentifiedDataSerializable {
+
+ private PipelineLocation pipelineLocation;
+ private PipelineStatus finalStatus;
+ private boolean savepointEnd;
+
+ private Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+ private Set<TaskGroupLocation> cleanedTaskGroups = new HashSet<>();
+ private boolean metricsImapCleaned;
+
+ private long createTimeMillis;
+ private long lastAttemptTimeMillis;
+ private int attemptCount;
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.PIPELINE_CLEANUP_RECORD_TYPE;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeObject(pipelineLocation);
+ out.writeString(finalStatus == null ? null : finalStatus.name());
+ out.writeBoolean(savepointEnd);
+
+ if (taskGroups == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(taskGroups.size());
+ for (Map.Entry<TaskGroupLocation, Address> entry :
taskGroups.entrySet()) {
+ out.writeObject(entry.getKey());
+ out.writeObject(entry.getValue());
+ }
+ }
+
+ if (cleanedTaskGroups == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(cleanedTaskGroups.size());
+ for (TaskGroupLocation taskGroupLocation : cleanedTaskGroups) {
+ out.writeObject(taskGroupLocation);
+ }
+ }
+
+ out.writeBoolean(metricsImapCleaned);
+ out.writeLong(createTimeMillis);
+ out.writeLong(lastAttemptTimeMillis);
+ out.writeInt(attemptCount);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ pipelineLocation = in.readObject();
+ String statusName = in.readString();
+ finalStatus = statusName == null ? null :
PipelineStatus.valueOf(statusName);
+ savepointEnd = in.readBoolean();
+
+ int taskGroupsSize = in.readInt();
+ if (taskGroupsSize >= 0) {
+ taskGroups = new HashMap<>(taskGroupsSize);
+ for (int i = 0; i < taskGroupsSize; i++) {
+ TaskGroupLocation taskGroupLocation = in.readObject();
+ Address address = in.readObject();
+ taskGroups.put(taskGroupLocation, address);
+ }
+ } else {
+ taskGroups = null;
+ }
+
+ int cleanedTaskGroupsSize = in.readInt();
+ if (cleanedTaskGroupsSize >= 0) {
+ cleanedTaskGroups = new HashSet<>(cleanedTaskGroupsSize);
+ for (int i = 0; i < cleanedTaskGroupsSize; i++) {
+ cleanedTaskGroups.add(in.readObject());
+ }
+ } else {
+ cleanedTaskGroups = null;
+ }
+
+ metricsImapCleaned = in.readBoolean();
+ createTimeMillis = in.readLong();
+ lastAttemptTimeMillis = in.readLong();
+ attemptCount = in.readInt();
+ }
+
+ public boolean isCleaned() {
+ return metricsImapCleaned
+ && taskGroups != null
+ && cleanedTaskGroups != null
+ && cleanedTaskGroups.containsAll(taskGroups.keySet());
+ }
+
+ public PipelineCleanupRecord mergeFrom(PipelineCleanupRecord other) {
+ if (other == null) {
+ return this;
+ }
+ Map<TaskGroupLocation, Address> mergedTaskGroups = new HashMap<>();
+ if (this.taskGroups != null) {
+ mergedTaskGroups.putAll(this.taskGroups);
+ }
+ if (other.taskGroups != null) {
+ mergedTaskGroups.putAll(other.taskGroups);
+ }
+
+ Set<TaskGroupLocation> mergedCleaned = new HashSet<>();
+ if (this.cleanedTaskGroups != null) {
+ mergedCleaned.addAll(this.cleanedTaskGroups);
+ }
+ if (other.cleanedTaskGroups != null) {
+ mergedCleaned.addAll(other.cleanedTaskGroups);
+ }
+
+ PipelineCleanupRecord merged =
+ new PipelineCleanupRecord(
+ this.pipelineLocation != null
+ ? this.pipelineLocation
+ : other.pipelineLocation,
+ this.finalStatus != null ? this.finalStatus :
other.finalStatus,
+ this.savepointEnd || other.savepointEnd,
+ mergedTaskGroups,
+ mergedCleaned,
+ this.metricsImapCleaned || other.metricsImapCleaned,
+ this.createTimeMillis != 0 ? this.createTimeMillis :
other.createTimeMillis,
+ Math.max(this.lastAttemptTimeMillis,
other.lastAttemptTimeMillis),
+ Math.max(this.attemptCount, other.attemptCount));
+ return merged;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 5e5c12d706..b1a3ebc760 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.serializable;
import
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetPendingJobsOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
@@ -56,6 +57,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
public static final int GET_PENDING_JOBS_TYPE = 10;
+ public static final int PIPELINE_CLEANUP_RECORD_TYPE = 11;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
@@ -96,6 +99,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
return new GetOverviewOperation();
case GET_PENDING_JOBS_TYPE:
return new GetPendingJobsOperation();
+ case PIPELINE_CLEANUP_RECORD_TYPE:
+ return new PipelineCleanupRecord();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServicePipelineCleanupTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServicePipelineCleanupTest.java
new file mode 100644
index 0000000000..07f32459d1
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServicePipelineCleanupTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.map.IMap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+class CoordinatorServicePipelineCleanupTest extends
AbstractSeaTunnelServerTest {
+
+ @Test
+ void testCleanupRemovesMetricsAndRecordWhenNoTaskGroups() {
+ CoordinatorService coordinatorService = server.getCoordinatorService();
+ awaitCoordinatorActive(coordinatorService);
+
+ long jobId = System.currentTimeMillis();
+ PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+ PipelineLocation otherPipelineLocation = new PipelineLocation(jobId +
1, 1);
+
+ upsertMetricsForPipeline(pipelineLocation);
+ upsertMetricsForPipeline(otherPipelineLocation);
+ Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+ Assertions.assertTrue(hasMetricsForPipeline(otherPipelineLocation));
+
+ IMap<Object, Object> runningJobStateIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ runningJobStateIMap.put(pipelineLocation, PipelineStatus.FINISHED);
+
+ IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+ pendingCleanupIMap.put(
+ pipelineLocation,
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ PipelineStatus.FINISHED,
+ false,
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ false,
+ System.currentTimeMillis(),
+ 0L,
+ 0));
+
+ coordinatorService.runPendingPipelineCleanupOnce();
+
+ Assertions.assertFalse(hasMetricsForPipeline(pipelineLocation));
+ Assertions.assertTrue(hasMetricsForPipeline(otherPipelineLocation));
+
Assertions.assertFalse(pendingCleanupIMap.containsKey(pipelineLocation));
+ }
+
+ @Test
+ void testSkipCleanupWhenPipelineNotEndState() {
+ CoordinatorService coordinatorService = server.getCoordinatorService();
+ awaitCoordinatorActive(coordinatorService);
+
+ long jobId = System.currentTimeMillis();
+ PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+
+ upsertMetricsForPipeline(pipelineLocation);
+ Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+ IMap<Object, Object> runningJobStateIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ runningJobStateIMap.put(pipelineLocation, PipelineStatus.RUNNING);
+
+ IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+ PipelineCleanupRecord record =
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ PipelineStatus.FINISHED,
+ false,
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ false,
+ System.currentTimeMillis(),
+ 0L,
+ 0);
+ pendingCleanupIMap.put(pipelineLocation, record);
+
+ coordinatorService.runPendingPipelineCleanupOnce();
+
+ PipelineCleanupRecord after = pendingCleanupIMap.get(pipelineLocation);
+ Assertions.assertNotNull(after);
+ Assertions.assertEquals(0, after.getAttemptCount());
+ Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+ }
+
+ @Test
+ void testRemoveRecordWhenShouldCleanupIsFalse() {
+ CoordinatorService coordinatorService = server.getCoordinatorService();
+ awaitCoordinatorActive(coordinatorService);
+
+ long jobId = System.currentTimeMillis();
+ PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+ upsertMetricsForPipeline(pipelineLocation);
+ Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+ IMap<Object, Object> runningJobStateIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ runningJobStateIMap.put(pipelineLocation, PipelineStatus.FINISHED);
+
+ IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+ pendingCleanupIMap.put(
+ pipelineLocation,
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ PipelineStatus.FINISHED,
+ true,
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ false,
+ System.currentTimeMillis(),
+ 0L,
+ 0));
+
+ coordinatorService.runPendingPipelineCleanupOnce();
+
+
Assertions.assertFalse(pendingCleanupIMap.containsKey(pipelineLocation));
+ Assertions.assertTrue(
+ hasMetricsForPipeline(pipelineLocation),
+ "Should not clean metrics when record is removed due to
shouldCleanup=false");
+ }
+
+ @Test
+ void testCleanupUpdatesRecordAndKeepsItWhenTaskGroupCannotBeCleaned() {
+ CoordinatorService coordinatorService = server.getCoordinatorService();
+ awaitCoordinatorActive(coordinatorService);
+
+ long jobId = System.currentTimeMillis();
+ PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+ upsertMetricsForPipeline(pipelineLocation);
+ Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+ IMap<Object, Object> runningJobStateIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ runningJobStateIMap.put(pipelineLocation, PipelineStatus.CANCELED);
+
+ TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobId, 1,
1L);
+ Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+ taskGroups.put(taskGroupLocation, null);
+
+ IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+ pendingCleanupIMap.put(
+ pipelineLocation,
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ PipelineStatus.CANCELED,
+ false,
+ taskGroups,
+ new HashSet<>(),
+ false,
+ System.currentTimeMillis(),
+ 0L,
+ 0));
+
+ coordinatorService.runPendingPipelineCleanupOnce();
+
+ PipelineCleanupRecord updated =
pendingCleanupIMap.get(pipelineLocation);
+ Assertions.assertNotNull(updated);
+ Assertions.assertEquals(1, updated.getAttemptCount());
+ Assertions.assertTrue(updated.isMetricsImapCleaned());
+ Assertions.assertFalse(updated.isCleaned());
+
Assertions.assertFalse(updated.getCleanedTaskGroups().contains(taskGroupLocation));
+ Assertions.assertFalse(hasMetricsForPipeline(pipelineLocation));
+ }
+
+ @Test
+ void testCleanupRemovesRecordWhenAllTaskGroupsCleaned() {
+ CoordinatorService coordinatorService = server.getCoordinatorService();
+ awaitCoordinatorActive(coordinatorService);
+
+ long jobId = System.currentTimeMillis();
+ PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+ upsertMetricsForPipeline(pipelineLocation);
+ Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+ IMap<Object, Object> runningJobStateIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ runningJobStateIMap.put(pipelineLocation, PipelineStatus.CANCELED);
+
+ Address localAddress =
instance.getCluster().getLocalMember().getAddress();
+ TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobId, 1,
1L);
+ Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+ taskGroups.put(taskGroupLocation, localAddress);
+
+ IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+ pendingCleanupIMap.put(
+ pipelineLocation,
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ PipelineStatus.CANCELED,
+ false,
+ taskGroups,
+ new HashSet<>(),
+ false,
+ System.currentTimeMillis(),
+ 0L,
+ 0));
+
+ coordinatorService.runPendingPipelineCleanupOnce();
+
+ Assertions.assertFalse(hasMetricsForPipeline(pipelineLocation));
+
Assertions.assertFalse(pendingCleanupIMap.containsKey(pipelineLocation));
+ }
+
+ private void upsertMetricsForPipeline(PipelineLocation pipelineLocation) {
+ TaskGroupLocation taskGroupLocation =
+ new TaskGroupLocation(
+ pipelineLocation.getJobId(),
pipelineLocation.getPipelineId(), 1L);
+ TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0, 0);
+
+ Map<TaskLocation, SeaTunnelMetricsContext> local = new HashMap<>();
+ local.put(taskLocation, new SeaTunnelMetricsContext());
+ server.updateMetrics(local);
+ }
+
+ private boolean hasMetricsForPipeline(PipelineLocation pipelineLocation) {
+ IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsIMap =
+
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ return metricsIMap.entrySet().stream()
+ .flatMap(entry -> entry.getValue().keySet().stream())
+ .anyMatch(
+ taskLocation ->
+ pipelineLocation.equals(
+
taskLocation.getTaskGroupLocation().getPipelineLocation()));
+ }
+
+ private void awaitCoordinatorActive(CoordinatorService coordinatorService)
{
+ await().atMost(30, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
Assertions.assertTrue(coordinatorService.isCoordinatorActive()));
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordHazelcastSerializationTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordHazelcastSerializationTest.java
new file mode 100644
index 0000000000..115730ce94
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordHazelcastSerializationTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master.cleanup;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.map.IMap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+class PipelineCleanupRecordHazelcastSerializationTest {
+
+ @Test
+ void testPutAndGetAcrossMembers() {
+ String clusterName =
+ TestUtils.getClusterName(
+
"PipelineCleanupRecordHazelcastSerializationTest_testPutAndGetAcrossMembers");
+ HazelcastInstanceImpl instance1 =
+ SeaTunnelServerStarter.createHazelcastInstance(clusterName);
+ HazelcastInstanceImpl instance2 =
+ SeaTunnelServerStarter.createHazelcastInstance(clusterName);
+ try {
+ await().atMost(30, TimeUnit.SECONDS)
+ .until(() -> instance1.getCluster().getMembers().size() ==
2);
+
+ PipelineLocation pipelineLocation = new PipelineLocation(1L, 1);
+ TaskGroupLocation taskGroupLocation = new TaskGroupLocation(1L, 1,
1L);
+ Address workerAddress =
instance1.getCluster().getLocalMember().getAddress();
+ Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+ taskGroups.put(taskGroupLocation, workerAddress);
+
+ PipelineCleanupRecord record =
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ PipelineStatus.CANCELED,
+ false,
+ taskGroups,
+ new
HashSet<>(Collections.singleton(taskGroupLocation)),
+ true,
+ 100L,
+ 200L,
+ 3);
+
+ IMap<PipelineLocation, PipelineCleanupRecord> map1 =
+ instance1.getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+ IMap<PipelineLocation, PipelineCleanupRecord> map2 =
+ instance2.getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+
+ map1.put(pipelineLocation, record);
+
+ await().atMost(30, TimeUnit.SECONDS).until(() ->
map2.containsKey(pipelineLocation));
+
+ PipelineCleanupRecord read = map2.get(pipelineLocation);
+ Assertions.assertNotNull(read);
+ Assertions.assertEquals(pipelineLocation,
read.getPipelineLocation());
+ Assertions.assertEquals(PipelineStatus.CANCELED,
read.getFinalStatus());
+ Assertions.assertFalse(read.isSavepointEnd());
+ Assertions.assertTrue(read.isMetricsImapCleaned());
+ Assertions.assertEquals(100L, read.getCreateTimeMillis());
+ Assertions.assertEquals(200L, read.getLastAttemptTimeMillis());
+ Assertions.assertEquals(3, read.getAttemptCount());
+ Assertions.assertEquals(workerAddress,
read.getTaskGroups().get(taskGroupLocation));
+
Assertions.assertTrue(read.getCleanedTaskGroups().contains(taskGroupLocation));
+ Assertions.assertTrue(read.isCleaned());
+ } finally {
+ instance1.shutdown();
+ instance2.shutdown();
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordTest.java
new file mode 100644
index 0000000000..b7f982a742
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master.cleanup;
+
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+class PipelineCleanupRecordTest {
+
+ @Test
+ void testIsCleanedWithEmptyTaskGroups() {
+ PipelineCleanupRecord record =
+ new PipelineCleanupRecord(
+ new PipelineLocation(1L, 1),
+ PipelineStatus.FINISHED,
+ false,
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ true,
+ System.currentTimeMillis(),
+ 0L,
+ 0);
+ Assertions.assertTrue(record.isCleaned());
+ }
+
+ @Test
+ void testIsCleanedRequiresMetricsCleanedAndAllTaskGroupsCleaned() {
+ PipelineLocation pipelineLocation = new PipelineLocation(1L, 1);
+ TaskGroupLocation taskGroupLocation1 = new TaskGroupLocation(1L, 1,
1L);
+ TaskGroupLocation taskGroupLocation2 = new TaskGroupLocation(1L, 1,
2L);
+
+ Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+ taskGroups.put(taskGroupLocation1, null);
+ taskGroups.put(taskGroupLocation2, null);
+
+ PipelineCleanupRecord record =
+ new PipelineCleanupRecord(
+ pipelineLocation,
+ PipelineStatus.CANCELED,
+ false,
+ taskGroups,
+ new HashSet<>(),
+ false,
+ System.currentTimeMillis(),
+ 0L,
+ 0);
+
+ Assertions.assertFalse(record.isCleaned());
+
+ record.setMetricsImapCleaned(true);
+ Assertions.assertFalse(record.isCleaned());
+
+ record.setCleanedTaskGroups(Collections.singleton(taskGroupLocation1));
+ Assertions.assertFalse(record.isCleaned());
+
+ record.setCleanedTaskGroups(new HashSet<>(taskGroups.keySet()));
+ Assertions.assertTrue(record.isCleaned());
+ }
+
+ @Test
+ void testMergeFromPrefersNonNullFieldsAndUnionsCollections() {
+ PipelineLocation pipelineLocation1 = new PipelineLocation(1L, 1);
+ PipelineLocation pipelineLocation2 = new PipelineLocation(1L, 2);
+ TaskGroupLocation taskGroupLocation1 = new TaskGroupLocation(1L, 1,
1L);
+ TaskGroupLocation taskGroupLocation2 = new TaskGroupLocation(1L, 1,
2L);
+
+ Map<TaskGroupLocation, Address> taskGroups1 = new HashMap<>();
+ taskGroups1.put(taskGroupLocation1, null);
+ Set<TaskGroupLocation> cleaned1 = new HashSet<>();
+ cleaned1.add(taskGroupLocation1);
+
+ PipelineCleanupRecord record1 =
+ new PipelineCleanupRecord(
+ pipelineLocation1,
+ PipelineStatus.FINISHED,
+ false,
+ taskGroups1,
+ cleaned1,
+ false,
+ 100L,
+ 200L,
+ 1);
+
+ Map<TaskGroupLocation, Address> taskGroups2 = new HashMap<>();
+ taskGroups2.put(taskGroupLocation2, null);
+ Set<TaskGroupLocation> cleaned2 = new HashSet<>();
+ cleaned2.add(taskGroupLocation2);
+
+ PipelineCleanupRecord record2 =
+ new PipelineCleanupRecord(
+ pipelineLocation2,
+ PipelineStatus.CANCELED,
+ true,
+ taskGroups2,
+ cleaned2,
+ true,
+ 300L,
+ 400L,
+ 3);
+
+ PipelineCleanupRecord merged = record1.mergeFrom(record2);
+
+ Assertions.assertEquals(pipelineLocation1,
merged.getPipelineLocation());
+ Assertions.assertEquals(PipelineStatus.FINISHED,
merged.getFinalStatus());
+ Assertions.assertTrue(merged.isSavepointEnd());
+ Assertions.assertTrue(merged.isMetricsImapCleaned());
+
+ Assertions.assertEquals(2, merged.getTaskGroups().size());
+
Assertions.assertTrue(merged.getTaskGroups().containsKey(taskGroupLocation1));
+
Assertions.assertTrue(merged.getTaskGroups().containsKey(taskGroupLocation2));
+
+ Assertions.assertEquals(2, merged.getCleanedTaskGroups().size());
+
Assertions.assertTrue(merged.getCleanedTaskGroups().contains(taskGroupLocation1));
+
Assertions.assertTrue(merged.getCleanedTaskGroups().contains(taskGroupLocation2));
+
+ Assertions.assertEquals(100L, merged.getCreateTimeMillis());
+ Assertions.assertEquals(400L, merged.getLastAttemptTimeMillis());
+ Assertions.assertEquals(3, merged.getAttemptCount());
+ }
+}