This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 17a416455e [Bugfix][zeta] Fix memory leak issues related to
checkpoints (#5539)
17a416455e is described below
commit 17a416455ed09627558c85fd294e04648d0c5805
Author: ic4y <[email protected]>
AuthorDate: Mon Sep 25 19:17:52 2023 +0800
[Bugfix][zeta] Fix memory leak issues related to checkpoints (#5539)
---
.../server/checkpoint/CheckpointCoordinator.java | 60 ++++++++++++----------
.../server/checkpoint/PendingCheckpoint.java | 13 +++++
2 files changed, 46 insertions(+), 27 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 222f60a5cb..e3add95cb9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -64,6 +64,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -111,7 +112,7 @@ public class CheckpointCoordinator {
private final Set<TaskLocation> readyToCloseStartingTask;
private final ConcurrentHashMap<Long, PendingCheckpoint>
pendingCheckpoints;
- private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
+ private final ArrayDeque<String> completedCheckpointIds;
private volatile CompletedCheckpoint latestCompletedCheckpoint = null;
@@ -165,7 +166,7 @@ public class CheckpointCoordinator {
this.plan = plan;
this.coordinatorConfig = checkpointConfig;
this.pendingCheckpoints = new ConcurrentHashMap<>();
- this.completedCheckpoints =
+ this.completedCheckpointIds =
new
ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
this.scheduler =
Executors.newScheduledThreadPool(
@@ -178,6 +179,7 @@ public class CheckpointCoordinator {
"checkpoint-coordinator-%s/%s",
pipelineId, jobId));
return thread;
});
+ ((ScheduledThreadPoolExecutor)
this.scheduler).setRemoveOnCancelPolicy(true);
this.serializer = new ProtoStuffSerializer();
this.pipelineTasks = getPipelineTasks(plan.getPipelineSubtasks());
this.pipelineTaskStatus = new ConcurrentHashMap<>();
@@ -406,18 +408,18 @@ public class CheckpointCoordinator {
shutdown));
return;
}
- if (checkpointType.isFinalCheckpoint() ||
checkpointType.isSchemaChangeCheckpoint()) {
- if (pendingCounter.get() > 0) {
- scheduleTriggerPendingCheckpoint(checkpointType, 500L);
- return;
- }
- }
if (schemaChanging.get() && checkpointType.isGeneralCheckpoint()) {
LOG.info("skip trigger generic-checkpoint because schema
change in progress");
return;
}
+ if (pendingCounter.get() > 0) {
+ scheduleTriggerPendingCheckpoint(checkpointType, 500L);
+ LOG.info("skip trigger checkpoint because there is already a
pending checkpoint.");
+ return;
+ }
+
CompletableFuture<PendingCheckpoint> pendingCheckpoint =
createPendingCheckpoint(currentTimestamp, checkpointType);
startTriggerPendingCheckpoint(pendingCheckpoint);
@@ -522,19 +524,24 @@ public class CheckpointCoordinator {
checkpointTimeout =
coordinatorConfig.getSchemaChangeCheckpointTimeout();
}
// TODO Need change to polling check until max timeout
fails
- scheduler.schedule(
- () -> {
- // If any task is not acked within the
checkpoint timeout
- if
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId())
- != null
- &&
!pendingCheckpoint.isFullyAcknowledged()) {
- LOG.info("timeout checkpoint: " +
pendingCheckpoint.getInfo());
- handleCoordinatorError(
-
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
- }
- },
- checkpointTimeout,
- TimeUnit.MILLISECONDS);
+ pendingCheckpoint.setCheckpointTimeOutFuture(
+ scheduler.schedule(
+ () -> {
+ // If any task is not acked within the
checkpoint
+ // timeout
+ if (pendingCheckpoints.get(
+
pendingCheckpoint.getCheckpointId())
+ != null
+ &&
!pendingCheckpoint.isFullyAcknowledged()) {
+ LOG.info(
+ "timeout checkpoint: "
+ +
pendingCheckpoint.getInfo());
+ handleCoordinatorError(
+
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
+ }
+ },
+ checkpointTimeout,
+ TimeUnit.MILLISECONDS));
});
}
@@ -695,7 +702,7 @@ public class CheckpointCoordinator {
completedCheckpoint.getCheckpointTimestamp(),
completedCheckpoint.getCompletedTimestamp());
final long checkpointId = completedCheckpoint.getCheckpointId();
- completedCheckpoints.addLast(completedCheckpoint);
+
completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId()));
try {
byte[] states = serializer.serialize(completedCheckpoint);
checkpointStorage.storeCheckPoint(
@@ -705,18 +712,17 @@ public class CheckpointCoordinator {
.pipelineId(pipelineId)
.states(states)
.build());
- if (completedCheckpoints.size()
+ if (completedCheckpointIds.size()
%
coordinatorConfig.getStorage().getMaxRetainedCheckpoints()
== 0
- && completedCheckpoints.size()
+ && completedCheckpointIds.size()
/
coordinatorConfig.getStorage().getMaxRetainedCheckpoints()
> 1) {
List<String> needDeleteCheckpointId = new ArrayList<>();
for (int i = 0;
i <
coordinatorConfig.getStorage().getMaxRetainedCheckpoints();
i++) {
- needDeleteCheckpointId.add(
-
completedCheckpoints.removeFirst().getCheckpointId() + "");
+
needDeleteCheckpointId.add(completedCheckpointIds.removeFirst());
}
checkpointStorage.deleteCheckpoint(
String.valueOf(completedCheckpoint.getJobId()),
@@ -734,7 +740,7 @@ public class CheckpointCoordinator {
completedCheckpoint.getJobId());
latestCompletedCheckpoint = completedCheckpoint;
notifyCompleted(completedCheckpoint);
- pendingCheckpoints.remove(checkpointId);
+
pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted();
pendingCounter.decrementAndGet();
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index b2a46fee05..1d92096369 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import com.beust.jcommander.internal.Nullable;
import lombok.Getter;
+import lombok.Setter;
import java.time.Instant;
import java.util.List;
@@ -34,6 +35,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
public class PendingCheckpoint implements Checkpoint {
private static final Logger LOG =
LoggerFactory.getLogger(PendingCheckpoint.class);
@@ -57,6 +59,8 @@ public class PendingCheckpoint implements Checkpoint {
@Getter private CheckpointException failureCause;
+ @Setter ScheduledFuture<?> checkpointTimeOutFuture;
+
public PendingCheckpoint(
long jobId,
int pipelineId,
@@ -174,6 +178,15 @@ public class PendingCheckpoint implements Checkpoint {
}
}
+ // Avoid memory leak in ScheduledThreadPoolExecutor due to overly long
timeout settings causing
+ // numerous completed checkpoints to remain
+ public void abortCheckpointTimeoutFutureWhenIsCompleted() {
+ if (checkpointTimeOutFuture == null) {
+ return;
+ }
+ checkpointTimeOutFuture.cancel(false);
+ }
+
public String getInfo() {
return String.format(
"%s/%s/%s, %s",