This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 17a416455e [Bugfix][zeta] Fix memory leak issues related to 
checkpoints (#5539)
17a416455e is described below

commit 17a416455ed09627558c85fd294e04648d0c5805
Author: ic4y <[email protected]>
AuthorDate: Mon Sep 25 19:17:52 2023 +0800

    [Bugfix][zeta] Fix memory leak issues related to checkpoints (#5539)
---
 .../server/checkpoint/CheckpointCoordinator.java   | 60 ++++++++++++----------
 .../server/checkpoint/PendingCheckpoint.java       | 13 +++++
 2 files changed, 46 insertions(+), 27 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 222f60a5cb..e3add95cb9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -64,6 +64,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -111,7 +112,7 @@ public class CheckpointCoordinator {
     private final Set<TaskLocation> readyToCloseStartingTask;
     private final ConcurrentHashMap<Long, PendingCheckpoint> 
pendingCheckpoints;
 
-    private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
+    private final ArrayDeque<String> completedCheckpointIds;
 
     private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
 
@@ -165,7 +166,7 @@ public class CheckpointCoordinator {
         this.plan = plan;
         this.coordinatorConfig = checkpointConfig;
         this.pendingCheckpoints = new ConcurrentHashMap<>();
-        this.completedCheckpoints =
+        this.completedCheckpointIds =
                 new 
ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
         this.scheduler =
                 Executors.newScheduledThreadPool(
@@ -178,6 +179,7 @@ public class CheckpointCoordinator {
                                             "checkpoint-coordinator-%s/%s", 
pipelineId, jobId));
                             return thread;
                         });
+        ((ScheduledThreadPoolExecutor) 
this.scheduler).setRemoveOnCancelPolicy(true);
         this.serializer = new ProtoStuffSerializer();
         this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks());
         this.pipelineTaskStatus = new ConcurrentHashMap<>();
@@ -406,18 +408,18 @@ public class CheckpointCoordinator {
                                 shutdown));
                 return;
             }
-            if (checkpointType.isFinalCheckpoint() || 
checkpointType.isSchemaChangeCheckpoint()) {
-                if (pendingCounter.get() > 0) {
-                    scheduleTriggerPendingCheckpoint(checkpointType, 500L);
-                    return;
-                }
-            }
 
             if (schemaChanging.get() && checkpointType.isGeneralCheckpoint()) {
                 LOG.info("skip trigger generic-checkpoint because schema 
change in progress");
                 return;
             }
 
+            if (pendingCounter.get() > 0) {
+                scheduleTriggerPendingCheckpoint(checkpointType, 500L);
+                LOG.info("skip trigger checkpoint because there is already a 
pending checkpoint.");
+                return;
+            }
+
             CompletableFuture<PendingCheckpoint> pendingCheckpoint =
                     createPendingCheckpoint(currentTimestamp, checkpointType);
             startTriggerPendingCheckpoint(pendingCheckpoint);
@@ -522,19 +524,24 @@ public class CheckpointCoordinator {
                         checkpointTimeout = 
coordinatorConfig.getSchemaChangeCheckpointTimeout();
                     }
                     // TODO Need change to polling check until max timeout 
fails
-                    scheduler.schedule(
-                            () -> {
-                                // If any task is not acked within the 
checkpoint timeout
-                                if 
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId())
-                                                != null
-                                        && 
!pendingCheckpoint.isFullyAcknowledged()) {
-                                    LOG.info("timeout checkpoint: " + 
pendingCheckpoint.getInfo());
-                                    handleCoordinatorError(
-                                            
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
-                                }
-                            },
-                            checkpointTimeout,
-                            TimeUnit.MILLISECONDS);
+                    pendingCheckpoint.setCheckpointTimeOutFuture(
+                            scheduler.schedule(
+                                    () -> {
+                                        // If any task is not acked within the 
checkpoint
+                                        // timeout
+                                        if (pendingCheckpoints.get(
+                                                                
pendingCheckpoint.getCheckpointId())
+                                                        != null
+                                                && 
!pendingCheckpoint.isFullyAcknowledged()) {
+                                            LOG.info(
+                                                    "timeout checkpoint: "
+                                                            + 
pendingCheckpoint.getInfo());
+                                            handleCoordinatorError(
+                                                    
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
+                                        }
+                                    },
+                                    checkpointTimeout,
+                                    TimeUnit.MILLISECONDS));
                 });
     }
 
@@ -695,7 +702,7 @@ public class CheckpointCoordinator {
                 completedCheckpoint.getCheckpointTimestamp(),
                 completedCheckpoint.getCompletedTimestamp());
         final long checkpointId = completedCheckpoint.getCheckpointId();
-        completedCheckpoints.addLast(completedCheckpoint);
+        
completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId()));
         try {
             byte[] states = serializer.serialize(completedCheckpoint);
             checkpointStorage.storeCheckPoint(
@@ -705,18 +712,17 @@ public class CheckpointCoordinator {
                             .pipelineId(pipelineId)
                             .states(states)
                             .build());
-            if (completedCheckpoints.size()
+            if (completedCheckpointIds.size()
                                     % 
coordinatorConfig.getStorage().getMaxRetainedCheckpoints()
                             == 0
-                    && completedCheckpoints.size()
+                    && completedCheckpointIds.size()
                                     / 
coordinatorConfig.getStorage().getMaxRetainedCheckpoints()
                             > 1) {
                 List<String> needDeleteCheckpointId = new ArrayList<>();
                 for (int i = 0;
                         i < 
coordinatorConfig.getStorage().getMaxRetainedCheckpoints();
                         i++) {
-                    needDeleteCheckpointId.add(
-                            
completedCheckpoints.removeFirst().getCheckpointId() + "");
+                    
needDeleteCheckpointId.add(completedCheckpointIds.removeFirst());
                 }
                 checkpointStorage.deleteCheckpoint(
                         String.valueOf(completedCheckpoint.getJobId()),
@@ -734,7 +740,7 @@ public class CheckpointCoordinator {
                 completedCheckpoint.getJobId());
         latestCompletedCheckpoint = completedCheckpoint;
         notifyCompleted(completedCheckpoint);
-        pendingCheckpoints.remove(checkpointId);
+        
pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted();
         pendingCounter.decrementAndGet();
         if (isCompleted()) {
             
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index b2a46fee05..1d92096369 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.internal.Nullable;
 import lombok.Getter;
+import lombok.Setter;
 
 import java.time.Instant;
 import java.util.List;
@@ -34,6 +35,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
 
 public class PendingCheckpoint implements Checkpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingCheckpoint.class);
@@ -57,6 +59,8 @@ public class PendingCheckpoint implements Checkpoint {
 
     @Getter private CheckpointException failureCause;
 
+    @Setter ScheduledFuture<?> checkpointTimeOutFuture;
+
     public PendingCheckpoint(
             long jobId,
             int pipelineId,
@@ -174,6 +178,15 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
+    // Avoid memory leak in ScheduledThreadPoolExecutor due to overly long 
timeout settings causing
+    // numerous completed checkpoints to remain
+    public void abortCheckpointTimeoutFutureWhenIsCompleted() {
+        if (checkpointTimeOutFuture == null) {
+            return;
+        }
+        checkpointTimeOutFuture.cancel(false);
+    }
+
     public String getInfo() {
         return String.format(
                 "%s/%s/%s, %s",

Reply via email to