This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7385f4d6ae [Fix][Zeta] Fix memory leak caused by failed pipeline 
cleanup after unstable worker communication (#10418)
7385f4d6ae is described below

commit 7385f4d6ae43e38835cc53f78dc05c6f112cddfa
Author: Jast <[email protected]>
AuthorDate: Wed Mar 18 23:27:07 2026 +0800

    [Fix][Zeta] Fix memory leak caused by failed pipeline cleanup after 
unstable worker communication (#10418)
---
 .../apache/seatunnel/engine/common/Constant.java   |   2 +
 .../engine/server/CoordinatorService.java          | 190 ++++++++++++++-
 .../engine/server/dag/physical/SubPlan.java        |   2 +
 .../seatunnel/engine/server/master/JobMaster.java  |  66 +++++
 .../master/cleanup/PipelineCleanupRecord.java      | 174 +++++++++++++
 .../serializable/ResourceDataSerializerHook.java   |   5 +
 .../CoordinatorServicePipelineCleanupTest.java     | 270 +++++++++++++++++++++
 ...ineCleanupRecordHazelcastSerializationTest.java | 101 ++++++++
 .../master/cleanup/PipelineCleanupRecordTest.java  | 147 +++++++++++
 9 files changed, 956 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index eadc73005f..da8c8abd09 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -59,6 +59,8 @@ public class Constant {
 
     public static final String IMAP_RUNNING_JOB_METRICS = 
"engine_runningJobMetrics";
 
+    public static final String IMAP_PENDING_PIPELINE_CLEANUP = 
"engine_pendingPipelineCleanup";
+
     public static final String IMAP_CHECKPOINT_MONITOR = 
"engine_checkpoint_monitor";
 
     public static final String IMAP_CONNECTOR_JAR_REF_COUNTERS = 
"engine_connectorJarRefCounters";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 77c281a168..f4273ed497 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -63,6 +63,7 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.master.JobHistoryService;
 import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
 import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
 import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import 
org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
@@ -70,6 +71,7 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
+import 
org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation;
 import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
 import org.apache.seatunnel.engine.server.telemetry.metrics.entity.JobCounter;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus;
@@ -110,6 +112,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;
 
 public class CoordinatorService {
+    private static final int PIPELINE_CLEANUP_INTERVAL_SECONDS = 60;
     private final NodeEngineImpl nodeEngine;
     private final ILogger logger;
 
@@ -175,6 +178,8 @@ public class CoordinatorService {
 
     private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> 
metricsImap;
 
+    private IMap<PipelineLocation, PipelineCleanupRecord> 
pendingPipelineCleanupIMap;
+
     /** If this node is a master node */
     private volatile boolean isActive = false;
 
@@ -184,6 +189,8 @@ public class CoordinatorService {
 
     private final ScheduledExecutorService masterActiveListener;
 
+    private final ScheduledExecutorService pipelineCleanupScheduler;
+
     private final EngineConfig engineConfig;
 
     private ConnectorPackageService connectorPackageService;
@@ -219,6 +226,16 @@ public class CoordinatorService {
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+        pipelineCleanupScheduler =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ThreadFactoryBuilder()
+                                .setNameFormat("seatunnel-pipeline-cleanup-%d")
+                                .build());
+        pipelineCleanupScheduler.scheduleAtFixedRate(
+                this::cleanupPendingPipelines,
+                PIPELINE_CLEANUP_INTERVAL_SECONDS,
+                PIPELINE_CLEANUP_INTERVAL_SECONDS,
+                TimeUnit.SECONDS);
         scheduleStrategy = engineConfig.getScheduleStrategy();
         isWaitStrategy = scheduleStrategy.equals(ScheduleStrategy.WAIT);
         logger.info("Start pending job schedule thread");
@@ -413,7 +430,8 @@ public class CoordinatorService {
         ownedSlotProfilesIMap =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
         metricsImap = 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
-
+        pendingPipelineCleanupIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
         jobHistoryService =
                 new JobHistoryService(
                         nodeEngine,
@@ -448,6 +466,168 @@ public class CoordinatorService {
                                 
this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
     }
 
+    private void cleanupPendingPipelines() {
+        if (!isActive) {
+            return;
+        }
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                this.pendingPipelineCleanupIMap;
+        if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
+            return;
+        }
+
+        try {
+            for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
+                    pendingCleanupIMap.entrySet()) {
+                processPendingPipelineCleanup(entry.getKey(), 
entry.getValue());
+            }
+        } catch (HazelcastInstanceNotActiveException e) {
+            logger.warning(
+                    String.format(
+                            "Skip pending pipeline cleanup: hazelcast not 
active: %s",
+                            ExceptionUtils.getMessage(e)));
+        } catch (Throwable t) {
+            logger.warning(
+                    String.format(
+                            "Unexpected exception in pending pipeline cleanup: 
%s",
+                            ExceptionUtils.getMessage(t)),
+                    t);
+        }
+    }
+
+    private void processPendingPipelineCleanup(
+            PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
+        if (pipelineLocation == null || record == null) {
+            return;
+        }
+        if (!shouldCleanup(record)) {
+            removePendingCleanupRecord(pipelineLocation, record);
+            return;
+        }
+
+        PipelineStatus currentStatus = 
getPipelineStatusFromIMap(pipelineLocation);
+        if (currentStatus != null && !currentStatus.isEndState()) {
+            return;
+        }
+
+        long now = System.currentTimeMillis();
+        PipelineCleanupRecord updated = copy(record);
+        updated.setLastAttemptTimeMillis(now);
+        updated.setAttemptCount(record.getAttemptCount() + 1);
+
+        if (!updated.isMetricsImapCleaned() && 
cleanupPipelineMetrics(pipelineLocation)) {
+            updated.setMetricsImapCleaned(true);
+        }
+
+        Map<TaskGroupLocation, Address> taskGroups = updated.getTaskGroups();
+        if (taskGroups != null && !taskGroups.isEmpty()) {
+            for (Map.Entry<TaskGroupLocation, Address> taskGroup : 
taskGroups.entrySet()) {
+                TaskGroupLocation taskGroupLocation = taskGroup.getKey();
+                if (updated.getCleanedTaskGroups() != null
+                        && 
updated.getCleanedTaskGroups().contains(taskGroupLocation)) {
+                    continue;
+                }
+                Address workerAddress = taskGroup.getValue();
+                if (workerAddress == null
+                        || 
nodeEngine.getClusterService().getMember(workerAddress) == null) {
+                    continue;
+                }
+                try {
+                    NodeEngineUtil.sendOperationToMemberNode(
+                                    nodeEngine,
+                                    new 
CleanTaskGroupContextOperation(taskGroupLocation),
+                                    workerAddress)
+                            .get();
+                    updated.getCleanedTaskGroups().add(taskGroupLocation);
+                } catch (HazelcastInstanceNotActiveException e) {
+                    logger.warning(
+                            String.format(
+                                    "%s clean TaskGroupContext failed: %s",
+                                    taskGroupLocation, 
ExceptionUtils.getMessage(e)));
+                } catch (Exception e) {
+                    logger.warning(
+                            String.format(
+                                    "%s clean TaskGroupContext failed: %s",
+                                    taskGroupLocation, 
ExceptionUtils.getMessage(e)),
+                            e);
+                }
+            }
+        }
+
+        boolean replaced = 
pendingPipelineCleanupIMap.replace(pipelineLocation, record, updated);
+        if (!replaced) {
+            return;
+        }
+        if (updated.isCleaned()) {
+            pendingPipelineCleanupIMap.remove(pipelineLocation, updated);
+        }
+    }
+
+    private void removePendingCleanupRecord(
+            PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
+        try {
+            pendingPipelineCleanupIMap.remove(pipelineLocation, record);
+        } catch (Exception e) {
+            logger.warning(
+                    String.format(
+                            "Remove pending pipeline cleanup record failed: 
%s",
+                            ExceptionUtils.getMessage(e)),
+                    e);
+        }
+    }
+
+    private boolean shouldCleanup(PipelineCleanupRecord record) {
+        if (record == null || record.getFinalStatus() == null) {
+            return false;
+        }
+        if (record.isSavepointEnd()) {
+            return false;
+        }
+        return PipelineStatus.CANCELED.equals(record.getFinalStatus())
+                || PipelineStatus.FINISHED.equals(record.getFinalStatus());
+    }
+
+    private PipelineStatus getPipelineStatusFromIMap(PipelineLocation 
pipelineLocation) {
+        Object state =
+                runningJobStateIMap != null ? 
runningJobStateIMap.get(pipelineLocation) : null;
+        return state instanceof PipelineStatus ? (PipelineStatus) state : null;
+    }
+
+    private PipelineCleanupRecord copy(PipelineCleanupRecord record) {
+        Map<TaskGroupLocation, Address> taskGroups =
+                record.getTaskGroups() == null
+                        ? Collections.emptyMap()
+                        : new HashMap<>(record.getTaskGroups());
+        Set<TaskGroupLocation> cleanedTaskGroups =
+                record.getCleanedTaskGroups() == null
+                        ? new HashSet<>()
+                        : new HashSet<>(record.getCleanedTaskGroups());
+        return new PipelineCleanupRecord(
+                record.getPipelineLocation(),
+                record.getFinalStatus(),
+                record.isSavepointEnd(),
+                taskGroups,
+                cleanedTaskGroups,
+                record.isMetricsImapCleaned(),
+                record.getCreateTimeMillis(),
+                record.getLastAttemptTimeMillis(),
+                record.getAttemptCount());
+    }
+
+    private boolean cleanupPipelineMetrics(PipelineLocation pipelineLocation) {
+        try {
+            seaTunnelServer.removeMetrics(pipelineLocation);
+            return true;
+        } catch (Exception e) {
+            logger.warning(
+                    String.format(
+                            "Failed to remove metrics context for pipeline %s: 
%s",
+                            pipelineLocation, ExceptionUtils.getMessage(e)),
+                    e);
+            return false;
+        }
+    }
+
     private void restoreAllRunningJobFromMasterNodeSwitch() {
         List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs;
         try {
@@ -960,6 +1140,9 @@ public class CoordinatorService {
         if (masterActiveListener != null) {
             masterActiveListener.shutdownNow();
         }
+        if (pipelineCleanupScheduler != null) {
+            pipelineCleanupScheduler.shutdownNow();
+        }
         clearCoordinatorService();
     }
 
@@ -1224,6 +1407,11 @@ public class CoordinatorService {
         return metricsImap;
     }
 
+    @VisibleForTesting
+    void runPendingPipelineCleanupOnce() {
+        cleanupPendingPipelines();
+    }
+
     @VisibleForTesting
     public PeekBlockingQueue<PendingJobInfo> getPendingJobQueue() {
         return pendingJobQueue;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index ee54911142..5e9aa83487 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -307,6 +307,8 @@ public class SubPlan {
         try {
             RetryUtils.retryWithException(
                     () -> {
+                        jobMaster.enqueuePipelineCleanupIfNeeded(
+                                getPipelineLocation(), pipelineStatus);
                         
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
                         try {
                             
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index ee1edc826c..20aaf667b3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -67,6 +67,7 @@ import 
org.apache.seatunnel.engine.server.dag.physical.ResourceUtils;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
 import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
 import 
org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
@@ -896,6 +897,71 @@ public class JobMaster {
         this.cleanTaskGroupContext(pipelineLocation);
     }
 
+    public void enqueuePipelineCleanupIfNeeded(
+            PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
+        if (pipelineLocation == null || pipelineStatus == null) {
+            return;
+        }
+        boolean savepointEnd =
+                PipelineStatus.FINISHED.equals(pipelineStatus)
+                        && checkpointManager != null
+                        && 
checkpointManager.isPipelineSavePointEnd(pipelineLocation);
+        boolean shouldCleanup =
+                PipelineStatus.CANCELED.equals(pipelineStatus)
+                        || (PipelineStatus.FINISHED.equals(pipelineStatus) && 
!savepointEnd);
+        if (!shouldCleanup) {
+            return;
+        }
+
+        Map<TaskGroupLocation, SlotProfile> slotProfileMap =
+                ownedSlotProfilesIMap.get(pipelineLocation);
+        Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+        if (slotProfileMap != null) {
+            slotProfileMap.forEach(
+                    (taskGroupLocation, slotProfile) ->
+                            taskGroups.put(taskGroupLocation, 
slotProfile.getWorker()));
+        }
+
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+        long now = System.currentTimeMillis();
+        PipelineCleanupRecord newRecord =
+                new PipelineCleanupRecord(
+                        pipelineLocation,
+                        pipelineStatus,
+                        savepointEnd,
+                        taskGroups,
+                        Collections.emptySet(),
+                        false,
+                        now,
+                        0,
+                        0);
+
+        while (true) {
+            PipelineCleanupRecord existing = 
pendingCleanupIMap.get(pipelineLocation);
+            if (existing == null) {
+                PipelineCleanupRecord prev =
+                        pendingCleanupIMap.putIfAbsent(pipelineLocation, 
newRecord);
+                if (prev == null) {
+                    return;
+                }
+                existing = prev;
+            }
+            PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
+            if (merged.equals(existing)) {
+                return;
+            }
+            if (pendingCleanupIMap.replace(pipelineLocation, existing, 
merged)) {
+                return;
+            }
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     public void removeMetricsContext(
             PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
         if ((pipelineStatus.equals(PipelineStatus.FINISHED)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecord.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecord.java
new file mode 100644
index 0000000000..6ace1cbb0b
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecord.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master.cleanup;
+
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import 
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PipelineCleanupRecord implements IdentifiedDataSerializable {
+
+    private PipelineLocation pipelineLocation;
+    private PipelineStatus finalStatus;
+    private boolean savepointEnd;
+
+    private Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+    private Set<TaskGroupLocation> cleanedTaskGroups = new HashSet<>();
+    private boolean metricsImapCleaned;
+
+    private long createTimeMillis;
+    private long lastAttemptTimeMillis;
+    private int attemptCount;
+
+    @Override
+    public int getFactoryId() {
+        return ResourceDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ResourceDataSerializerHook.PIPELINE_CLEANUP_RECORD_TYPE;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        out.writeObject(pipelineLocation);
+        out.writeString(finalStatus == null ? null : finalStatus.name());
+        out.writeBoolean(savepointEnd);
+
+        if (taskGroups == null) {
+            out.writeInt(-1);
+        } else {
+            out.writeInt(taskGroups.size());
+            for (Map.Entry<TaskGroupLocation, Address> entry : 
taskGroups.entrySet()) {
+                out.writeObject(entry.getKey());
+                out.writeObject(entry.getValue());
+            }
+        }
+
+        if (cleanedTaskGroups == null) {
+            out.writeInt(-1);
+        } else {
+            out.writeInt(cleanedTaskGroups.size());
+            for (TaskGroupLocation taskGroupLocation : cleanedTaskGroups) {
+                out.writeObject(taskGroupLocation);
+            }
+        }
+
+        out.writeBoolean(metricsImapCleaned);
+        out.writeLong(createTimeMillis);
+        out.writeLong(lastAttemptTimeMillis);
+        out.writeInt(attemptCount);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        pipelineLocation = in.readObject();
+        String statusName = in.readString();
+        finalStatus = statusName == null ? null : 
PipelineStatus.valueOf(statusName);
+        savepointEnd = in.readBoolean();
+
+        int taskGroupsSize = in.readInt();
+        if (taskGroupsSize >= 0) {
+            taskGroups = new HashMap<>(taskGroupsSize);
+            for (int i = 0; i < taskGroupsSize; i++) {
+                TaskGroupLocation taskGroupLocation = in.readObject();
+                Address address = in.readObject();
+                taskGroups.put(taskGroupLocation, address);
+            }
+        } else {
+            taskGroups = null;
+        }
+
+        int cleanedTaskGroupsSize = in.readInt();
+        if (cleanedTaskGroupsSize >= 0) {
+            cleanedTaskGroups = new HashSet<>(cleanedTaskGroupsSize);
+            for (int i = 0; i < cleanedTaskGroupsSize; i++) {
+                cleanedTaskGroups.add(in.readObject());
+            }
+        } else {
+            cleanedTaskGroups = null;
+        }
+
+        metricsImapCleaned = in.readBoolean();
+        createTimeMillis = in.readLong();
+        lastAttemptTimeMillis = in.readLong();
+        attemptCount = in.readInt();
+    }
+
+    public boolean isCleaned() {
+        return metricsImapCleaned
+                && taskGroups != null
+                && cleanedTaskGroups != null
+                && cleanedTaskGroups.containsAll(taskGroups.keySet());
+    }
+
+    public PipelineCleanupRecord mergeFrom(PipelineCleanupRecord other) {
+        if (other == null) {
+            return this;
+        }
+        Map<TaskGroupLocation, Address> mergedTaskGroups = new HashMap<>();
+        if (this.taskGroups != null) {
+            mergedTaskGroups.putAll(this.taskGroups);
+        }
+        if (other.taskGroups != null) {
+            mergedTaskGroups.putAll(other.taskGroups);
+        }
+
+        Set<TaskGroupLocation> mergedCleaned = new HashSet<>();
+        if (this.cleanedTaskGroups != null) {
+            mergedCleaned.addAll(this.cleanedTaskGroups);
+        }
+        if (other.cleanedTaskGroups != null) {
+            mergedCleaned.addAll(other.cleanedTaskGroups);
+        }
+
+        PipelineCleanupRecord merged =
+                new PipelineCleanupRecord(
+                        this.pipelineLocation != null
+                                ? this.pipelineLocation
+                                : other.pipelineLocation,
+                        this.finalStatus != null ? this.finalStatus : 
other.finalStatus,
+                        this.savepointEnd || other.savepointEnd,
+                        mergedTaskGroups,
+                        mergedCleaned,
+                        this.metricsImapCleaned || other.metricsImapCleaned,
+                        this.createTimeMillis != 0 ? this.createTimeMillis : 
other.createTimeMillis,
+                        Math.max(this.lastAttemptTimeMillis, 
other.lastAttemptTimeMillis),
+                        Math.max(this.attemptCount, other.attemptCount));
+        return merged;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 5e5c12d706..b1a3ebc760 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.serializable;
 
 import 
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetPendingJobsOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
@@ -56,6 +57,8 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
 
     public static final int GET_PENDING_JOBS_TYPE = 10;
 
+    public static final int PIPELINE_CLEANUP_RECORD_TYPE = 11;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
@@ -96,6 +99,8 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
                     return new GetOverviewOperation();
                 case GET_PENDING_JOBS_TYPE:
                     return new GetPendingJobsOperation();
+                case PIPELINE_CLEANUP_RECORD_TYPE:
+                    return new PipelineCleanupRecord();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServicePipelineCleanupTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServicePipelineCleanupTest.java
new file mode 100644
index 0000000000..07f32459d1
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServicePipelineCleanupTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.map.IMap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+class CoordinatorServicePipelineCleanupTest extends 
AbstractSeaTunnelServerTest {
+
+    @Test
+    void testCleanupRemovesMetricsAndRecordWhenNoTaskGroups() {
+        CoordinatorService coordinatorService = server.getCoordinatorService();
+        awaitCoordinatorActive(coordinatorService);
+
+        long jobId = System.currentTimeMillis();
+        PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+        PipelineLocation otherPipelineLocation = new PipelineLocation(jobId + 
1, 1);
+
+        upsertMetricsForPipeline(pipelineLocation);
+        upsertMetricsForPipeline(otherPipelineLocation);
+        Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+        Assertions.assertTrue(hasMetricsForPipeline(otherPipelineLocation));
+
+        IMap<Object, Object> runningJobStateIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+        runningJobStateIMap.put(pipelineLocation, PipelineStatus.FINISHED);
+
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+        pendingCleanupIMap.put(
+                pipelineLocation,
+                new PipelineCleanupRecord(
+                        pipelineLocation,
+                        PipelineStatus.FINISHED,
+                        false,
+                        Collections.emptyMap(),
+                        Collections.emptySet(),
+                        false,
+                        System.currentTimeMillis(),
+                        0L,
+                        0));
+
+        coordinatorService.runPendingPipelineCleanupOnce();
+
+        Assertions.assertFalse(hasMetricsForPipeline(pipelineLocation));
+        Assertions.assertTrue(hasMetricsForPipeline(otherPipelineLocation));
+        
Assertions.assertFalse(pendingCleanupIMap.containsKey(pipelineLocation));
+    }
+
+    @Test
+    void testSkipCleanupWhenPipelineNotEndState() {
+        CoordinatorService coordinatorService = server.getCoordinatorService();
+        awaitCoordinatorActive(coordinatorService);
+
+        long jobId = System.currentTimeMillis();
+        PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+
+        upsertMetricsForPipeline(pipelineLocation);
+        Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+        IMap<Object, Object> runningJobStateIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+        runningJobStateIMap.put(pipelineLocation, PipelineStatus.RUNNING);
+
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+        PipelineCleanupRecord record =
+                new PipelineCleanupRecord(
+                        pipelineLocation,
+                        PipelineStatus.FINISHED,
+                        false,
+                        Collections.emptyMap(),
+                        Collections.emptySet(),
+                        false,
+                        System.currentTimeMillis(),
+                        0L,
+                        0);
+        pendingCleanupIMap.put(pipelineLocation, record);
+
+        coordinatorService.runPendingPipelineCleanupOnce();
+
+        PipelineCleanupRecord after = pendingCleanupIMap.get(pipelineLocation);
+        Assertions.assertNotNull(after);
+        Assertions.assertEquals(0, after.getAttemptCount());
+        Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+    }
+
+    @Test
+    void testRemoveRecordWhenShouldCleanupIsFalse() {
+        CoordinatorService coordinatorService = server.getCoordinatorService();
+        awaitCoordinatorActive(coordinatorService);
+
+        long jobId = System.currentTimeMillis();
+        PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+        upsertMetricsForPipeline(pipelineLocation);
+        Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+        IMap<Object, Object> runningJobStateIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+        runningJobStateIMap.put(pipelineLocation, PipelineStatus.FINISHED);
+
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+        pendingCleanupIMap.put(
+                pipelineLocation,
+                new PipelineCleanupRecord(
+                        pipelineLocation,
+                        PipelineStatus.FINISHED,
+                        true,
+                        Collections.emptyMap(),
+                        Collections.emptySet(),
+                        false,
+                        System.currentTimeMillis(),
+                        0L,
+                        0));
+
+        coordinatorService.runPendingPipelineCleanupOnce();
+
+        
Assertions.assertFalse(pendingCleanupIMap.containsKey(pipelineLocation));
+        Assertions.assertTrue(
+                hasMetricsForPipeline(pipelineLocation),
+                "Should not clean metrics when record is removed due to 
shouldCleanup=false");
+    }
+
+    @Test
+    void testCleanupUpdatesRecordAndKeepsItWhenTaskGroupCannotBeCleaned() {
+        CoordinatorService coordinatorService = server.getCoordinatorService();
+        awaitCoordinatorActive(coordinatorService);
+
+        long jobId = System.currentTimeMillis();
+        PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+        upsertMetricsForPipeline(pipelineLocation);
+        Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+        IMap<Object, Object> runningJobStateIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+        runningJobStateIMap.put(pipelineLocation, PipelineStatus.CANCELED);
+
+        TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobId, 1, 
1L);
+        Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+        taskGroups.put(taskGroupLocation, null);
+
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+        pendingCleanupIMap.put(
+                pipelineLocation,
+                new PipelineCleanupRecord(
+                        pipelineLocation,
+                        PipelineStatus.CANCELED,
+                        false,
+                        taskGroups,
+                        new HashSet<>(),
+                        false,
+                        System.currentTimeMillis(),
+                        0L,
+                        0));
+
+        coordinatorService.runPendingPipelineCleanupOnce();
+
+        PipelineCleanupRecord updated = 
pendingCleanupIMap.get(pipelineLocation);
+        Assertions.assertNotNull(updated);
+        Assertions.assertEquals(1, updated.getAttemptCount());
+        Assertions.assertTrue(updated.isMetricsImapCleaned());
+        Assertions.assertFalse(updated.isCleaned());
+        
Assertions.assertFalse(updated.getCleanedTaskGroups().contains(taskGroupLocation));
+        Assertions.assertFalse(hasMetricsForPipeline(pipelineLocation));
+    }
+
+    @Test
+    void testCleanupRemovesRecordWhenAllTaskGroupsCleaned() {
+        CoordinatorService coordinatorService = server.getCoordinatorService();
+        awaitCoordinatorActive(coordinatorService);
+
+        long jobId = System.currentTimeMillis();
+        PipelineLocation pipelineLocation = new PipelineLocation(jobId, 1);
+        upsertMetricsForPipeline(pipelineLocation);
+        Assertions.assertTrue(hasMetricsForPipeline(pipelineLocation));
+
+        IMap<Object, Object> runningJobStateIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+        runningJobStateIMap.put(pipelineLocation, PipelineStatus.CANCELED);
+
+        Address localAddress = 
instance.getCluster().getLocalMember().getAddress();
+        TaskGroupLocation taskGroupLocation = new TaskGroupLocation(jobId, 1, 
1L);
+        Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+        taskGroups.put(taskGroupLocation, localAddress);
+
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+        pendingCleanupIMap.put(
+                pipelineLocation,
+                new PipelineCleanupRecord(
+                        pipelineLocation,
+                        PipelineStatus.CANCELED,
+                        false,
+                        taskGroups,
+                        new HashSet<>(),
+                        false,
+                        System.currentTimeMillis(),
+                        0L,
+                        0));
+
+        coordinatorService.runPendingPipelineCleanupOnce();
+
+        Assertions.assertFalse(hasMetricsForPipeline(pipelineLocation));
+        
Assertions.assertFalse(pendingCleanupIMap.containsKey(pipelineLocation));
+    }
+
+    private void upsertMetricsForPipeline(PipelineLocation pipelineLocation) {
+        TaskGroupLocation taskGroupLocation =
+                new TaskGroupLocation(
+                        pipelineLocation.getJobId(), 
pipelineLocation.getPipelineId(), 1L);
+        TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0, 0);
+
+        Map<TaskLocation, SeaTunnelMetricsContext> local = new HashMap<>();
+        local.put(taskLocation, new SeaTunnelMetricsContext());
+        server.updateMetrics(local);
+    }
+
+    private boolean hasMetricsForPipeline(PipelineLocation pipelineLocation) {
+        IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsIMap =
+                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+        return metricsIMap.entrySet().stream()
+                .flatMap(entry -> entry.getValue().keySet().stream())
+                .anyMatch(
+                        taskLocation ->
+                                pipelineLocation.equals(
+                                        
taskLocation.getTaskGroupLocation().getPipelineLocation()));
+    }
+
+    private void awaitCoordinatorActive(CoordinatorService coordinatorService) 
{
+        await().atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> 
Assertions.assertTrue(coordinatorService.isCoordinatorActive()));
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordHazelcastSerializationTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordHazelcastSerializationTest.java
new file mode 100644
index 0000000000..115730ce94
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordHazelcastSerializationTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master.cleanup;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.map.IMap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+class PipelineCleanupRecordHazelcastSerializationTest {
+
+    @Test
+    void testPutAndGetAcrossMembers() {
+        String clusterName =
+                TestUtils.getClusterName(
+                        
"PipelineCleanupRecordHazelcastSerializationTest_testPutAndGetAcrossMembers");
+        HazelcastInstanceImpl instance1 =
+                SeaTunnelServerStarter.createHazelcastInstance(clusterName);
+        HazelcastInstanceImpl instance2 =
+                SeaTunnelServerStarter.createHazelcastInstance(clusterName);
+        try {
+            await().atMost(30, TimeUnit.SECONDS)
+                    .until(() -> instance1.getCluster().getMembers().size() == 
2);
+
+            PipelineLocation pipelineLocation = new PipelineLocation(1L, 1);
+            TaskGroupLocation taskGroupLocation = new TaskGroupLocation(1L, 1, 
1L);
+            Address workerAddress = 
instance1.getCluster().getLocalMember().getAddress();
+            Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+            taskGroups.put(taskGroupLocation, workerAddress);
+
+            PipelineCleanupRecord record =
+                    new PipelineCleanupRecord(
+                            pipelineLocation,
+                            PipelineStatus.CANCELED,
+                            false,
+                            taskGroups,
+                            new 
HashSet<>(Collections.singleton(taskGroupLocation)),
+                            true,
+                            100L,
+                            200L,
+                            3);
+
+            IMap<PipelineLocation, PipelineCleanupRecord> map1 =
+                    instance1.getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+            IMap<PipelineLocation, PipelineCleanupRecord> map2 =
+                    instance2.getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
+
+            map1.put(pipelineLocation, record);
+
+            await().atMost(30, TimeUnit.SECONDS).until(() -> 
map2.containsKey(pipelineLocation));
+
+            PipelineCleanupRecord read = map2.get(pipelineLocation);
+            Assertions.assertNotNull(read);
+            Assertions.assertEquals(pipelineLocation, 
read.getPipelineLocation());
+            Assertions.assertEquals(PipelineStatus.CANCELED, 
read.getFinalStatus());
+            Assertions.assertFalse(read.isSavepointEnd());
+            Assertions.assertTrue(read.isMetricsImapCleaned());
+            Assertions.assertEquals(100L, read.getCreateTimeMillis());
+            Assertions.assertEquals(200L, read.getLastAttemptTimeMillis());
+            Assertions.assertEquals(3, read.getAttemptCount());
+            Assertions.assertEquals(workerAddress, 
read.getTaskGroups().get(taskGroupLocation));
+            
Assertions.assertTrue(read.getCleanedTaskGroups().contains(taskGroupLocation));
+            Assertions.assertTrue(read.isCleaned());
+        } finally {
+            instance1.shutdown();
+            instance2.shutdown();
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordTest.java
new file mode 100644
index 0000000000..b7f982a742
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/cleanup/PipelineCleanupRecordTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master.cleanup;
+
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+class PipelineCleanupRecordTest {
+
+    @Test
+    void testIsCleanedWithEmptyTaskGroups() {
+        PipelineCleanupRecord record =
+                new PipelineCleanupRecord(
+                        new PipelineLocation(1L, 1),
+                        PipelineStatus.FINISHED,
+                        false,
+                        Collections.emptyMap(),
+                        Collections.emptySet(),
+                        true,
+                        System.currentTimeMillis(),
+                        0L,
+                        0);
+        Assertions.assertTrue(record.isCleaned());
+    }
+
+    @Test
+    void testIsCleanedRequiresMetricsCleanedAndAllTaskGroupsCleaned() {
+        PipelineLocation pipelineLocation = new PipelineLocation(1L, 1);
+        TaskGroupLocation taskGroupLocation1 = new TaskGroupLocation(1L, 1, 
1L);
+        TaskGroupLocation taskGroupLocation2 = new TaskGroupLocation(1L, 1, 
2L);
+
+        Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
+        taskGroups.put(taskGroupLocation1, null);
+        taskGroups.put(taskGroupLocation2, null);
+
+        PipelineCleanupRecord record =
+                new PipelineCleanupRecord(
+                        pipelineLocation,
+                        PipelineStatus.CANCELED,
+                        false,
+                        taskGroups,
+                        new HashSet<>(),
+                        false,
+                        System.currentTimeMillis(),
+                        0L,
+                        0);
+
+        Assertions.assertFalse(record.isCleaned());
+
+        record.setMetricsImapCleaned(true);
+        Assertions.assertFalse(record.isCleaned());
+
+        record.setCleanedTaskGroups(Collections.singleton(taskGroupLocation1));
+        Assertions.assertFalse(record.isCleaned());
+
+        record.setCleanedTaskGroups(new HashSet<>(taskGroups.keySet()));
+        Assertions.assertTrue(record.isCleaned());
+    }
+
+    @Test
+    void testMergeFromPrefersNonNullFieldsAndUnionsCollections() {
+        PipelineLocation pipelineLocation1 = new PipelineLocation(1L, 1);
+        PipelineLocation pipelineLocation2 = new PipelineLocation(1L, 2);
+        TaskGroupLocation taskGroupLocation1 = new TaskGroupLocation(1L, 1, 
1L);
+        TaskGroupLocation taskGroupLocation2 = new TaskGroupLocation(1L, 1, 
2L);
+
+        Map<TaskGroupLocation, Address> taskGroups1 = new HashMap<>();
+        taskGroups1.put(taskGroupLocation1, null);
+        Set<TaskGroupLocation> cleaned1 = new HashSet<>();
+        cleaned1.add(taskGroupLocation1);
+
+        PipelineCleanupRecord record1 =
+                new PipelineCleanupRecord(
+                        pipelineLocation1,
+                        PipelineStatus.FINISHED,
+                        false,
+                        taskGroups1,
+                        cleaned1,
+                        false,
+                        100L,
+                        200L,
+                        1);
+
+        Map<TaskGroupLocation, Address> taskGroups2 = new HashMap<>();
+        taskGroups2.put(taskGroupLocation2, null);
+        Set<TaskGroupLocation> cleaned2 = new HashSet<>();
+        cleaned2.add(taskGroupLocation2);
+
+        PipelineCleanupRecord record2 =
+                new PipelineCleanupRecord(
+                        pipelineLocation2,
+                        PipelineStatus.CANCELED,
+                        true,
+                        taskGroups2,
+                        cleaned2,
+                        true,
+                        300L,
+                        400L,
+                        3);
+
+        PipelineCleanupRecord merged = record1.mergeFrom(record2);
+
+        Assertions.assertEquals(pipelineLocation1, 
merged.getPipelineLocation());
+        Assertions.assertEquals(PipelineStatus.FINISHED, 
merged.getFinalStatus());
+        Assertions.assertTrue(merged.isSavepointEnd());
+        Assertions.assertTrue(merged.isMetricsImapCleaned());
+
+        Assertions.assertEquals(2, merged.getTaskGroups().size());
+        
Assertions.assertTrue(merged.getTaskGroups().containsKey(taskGroupLocation1));
+        
Assertions.assertTrue(merged.getTaskGroups().containsKey(taskGroupLocation2));
+
+        Assertions.assertEquals(2, merged.getCleanedTaskGroups().size());
+        
Assertions.assertTrue(merged.getCleanedTaskGroups().contains(taskGroupLocation1));
+        
Assertions.assertTrue(merged.getCleanedTaskGroups().contains(taskGroupLocation2));
+
+        Assertions.assertEquals(100L, merged.getCreateTimeMillis());
+        Assertions.assertEquals(400L, merged.getLastAttemptTimeMillis());
+        Assertions.assertEquals(3, merged.getAttemptCount());
+    }
+}

Reply via email to