This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 45609ef1df [Fix][Zeta] Fix Imap resource leak (#9696)
45609ef1df is described below
commit 45609ef1dfd88d4844a90eec0d0984b1797e44d4
Author: dy102 <[email protected]>
AuthorDate: Thu Aug 21 12:03:46 2025 +0900
[Fix][Zeta] Fix Imap resource leak (#9696)
Co-authored-by: Jia Fan <[email protected]>
---
.../engine/server/CoordinatorService.java | 2 +
.../server/checkpoint/CheckpointCoordinator.java | 4 +
.../server/checkpoint/CheckpointManager.java | 3 -
.../seatunnel/engine/server/master/JobMaster.java | 13 +++
.../engine/server/CoordinatorServiceTest.java | 103 +++++++++++++++++++--
.../src/test/resources/batch_fake_to_console.conf | 48 ++++++++++
.../src/test/resources/batch_slot_not_enough.conf | 47 ++++++++++
.../src/test/resources/seatunnel_fixed_slots.yaml | 34 +++++++
8 files changed, 245 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 621c5fc91a..166f0dd961 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -212,6 +212,7 @@ public class CoordinatorService {
.setNameFormat("seatunnel-coordinator-service-%d")
.build(),
new ThreadPoolStatus.RejectionCountingHandler());
+
this.seaTunnelServer = seaTunnelServer;
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
@@ -287,6 +288,7 @@ public class CoordinatorService {
} else {
queueRemove(jobMaster);
completeFailJob(jobMaster);
+ pendingJobMasterMap.remove(jobId);
return;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 505dc32051..957090494c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -1069,6 +1069,10 @@ public class CheckpointCoordinator {
}
}
+ public String getCheckpointStateImapKey() {
+ return checkpointStateImapKey;
+ }
+
/** Only for test */
@VisibleForTesting
public PendingCheckpoint getSavepointPendingCheckpoint() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index db78e47eb1..d579fea9ec 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.engine.server.checkpoint;
-import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
@@ -176,7 +174,6 @@ public class CheckpointManager {
getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
}
- @VisibleForTesting
public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
if (coordinator == null) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index ee19e4dd85..59dfb99763 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.master;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -624,6 +626,12 @@ public class JobMaster {
runningJobStateTimestampsIMap.remove(
task.getTaskGroupLocation());
});
+
+ String checkpointStateImapKey =
+ checkpointManager
+
.getCheckpointCoordinator(pipeline.getPipelineId())
+ .getCheckpointStateImapKey();
+ runningJobStateIMap.remove(checkpointStateImapKey);
});
runningJobStateIMap.remove(jobId);
@@ -1084,4 +1092,9 @@ public class JobMaster {
public CoordinatorService getCoordinatorService() {
return this.seaTunnelServer.getCoordinatorService();
}
+
+ @VisibleForTesting
+ public IMap<Object, Object> getRunningJobStateIMap() {
+ return runningJobStateIMap;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index d7913b56ee..7a9ef89ec3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -36,7 +36,10 @@ import org.junitpioneer.jupiter.SetEnvironmentVariable;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.map.IMap;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
@@ -149,11 +152,72 @@ public class CoordinatorServiceTest {
}
@Test
- public void testClearCoordinatorService() {
+ void testCleanupPendingJobMasterMapAfterJobFailed() {
+ setConfigFile("seatunnel_fixed_slots.yaml");
+
+ JobInformation jobInformation =
+ submitJob(
+
"CoordinatorServiceTest_testCleanupPendingJobMasterMapAfterJobFailed",
+ "batch_slot_not_enough.conf",
+
"test_cleanup_pending_job_master_map_after_job_failed");
+
+ Assertions.assertNotNull(
+
jobInformation.coordinatorService.pendingJobMasterMap.get(jobInformation.jobId));
+
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertNull(
+
jobInformation.coordinatorService.pendingJobMasterMap.get(
+ jobInformation.jobId)));
+
+ jobInformation.coordinatorService.clearCoordinatorService();
+ jobInformation.coordinatorServiceTest.shutdown();
+
+ setDefaultConfigFile();
+ }
+
+ @Test
+ void testCleanupRunningJobStateIMap() {
+ JobInformation jobInformation =
+ submitJob(
+
"CoordinatorServiceTest_testCleanupRunningJobStateIMap",
+ "batch_fake_to_console.conf",
+ "test_cleanup_running_job_state_imap");
+ CoordinatorService coordinatorService =
jobInformation.coordinatorService;
+ IMap<Object, Object> runningJobStateIMap =
+
coordinatorService.getJobMaster(jobInformation.jobId).getRunningJobStateIMap();
+ Assertions.assertTrue(!runningJobStateIMap.isEmpty());
+
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(runningJobStateIMap.isEmpty()));
+
+ jobInformation.coordinatorService.clearCoordinatorService();
+ jobInformation.coordinatorServiceTest.shutdown();
+ }
+
+ private void setDefaultConfigFile() {
+ setConfigFile("seatunnel.yaml");
+ }
+
+ private void setConfigFile(String fileName) {
+ String rootModuleDir = "seatunnel-engine";
+ Path path = Paths.get(System.getProperty("user.dir"));
+ while (!path.endsWith(Paths.get(rootModuleDir))) {
+ path = path.getParent();
+ }
+ String rootPath = path.getParent().toString();
+ System.setProperty(
+ "seatunnel.config",
+ rootPath
+ +
"/seatunnel-engine/seatunnel-engine-server/src/test/resources/"
+ + fileName);
+ }
+
+ private JobInformation submitJob(String testClassName, String
jobConfigFile, String jobName) {
HazelcastInstanceImpl coordinatorServiceTest =
SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(
-
"CoordinatorServiceTest_testClearCoordinatorService"));
+ TestUtils.getClusterName(testClassName));
SeaTunnelServer server1 =
coordinatorServiceTest
.node
@@ -166,9 +230,7 @@ public class CoordinatorServiceTest {
coordinatorServiceTest
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
.newId();
- LogicalDag testLogicalDag =
- TestUtils.createTestLogicalPlan(
- "stream_fake_to_console.conf",
"test_clear_coordinator_service", jobId);
+ LogicalDag testLogicalDag =
TestUtils.createTestLogicalPlan(jobConfigFile, jobName, jobId);
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
@@ -185,6 +247,20 @@ public class CoordinatorServiceTest {
coordinatorService
.submitJob(jobId, data,
jobImmutableInformation.isStartWithSavePoint())
.join();
+ return new JobInformation(coordinatorServiceTest, coordinatorService,
jobId);
+ }
+
+ @Test
+ public void testClearCoordinatorService() {
+ JobInformation jobInformation =
+ submitJob(
+ "CoordinatorServiceTest_testClearCoordinatorService",
+ "stream_fake_to_console.conf",
+ "test_clear_coordinator_service");
+
+ CoordinatorService coordinatorService =
jobInformation.coordinatorService;
+ Long jobId = jobInformation.jobId;
+ HazelcastInstanceImpl coordinatorServiceTest =
jobInformation.coordinatorServiceTest;
// waiting for job status turn to running
await().atMost(10000, TimeUnit.MILLISECONDS)
@@ -349,4 +425,19 @@ public class CoordinatorServiceTest {
.size());
}
}
+
+ private static class JobInformation {
+ public final HazelcastInstanceImpl coordinatorServiceTest;
+ public final CoordinatorService coordinatorService;
+ public final Long jobId;
+
+ public JobInformation(
+ HazelcastInstanceImpl coordinatorServiceTest,
+ CoordinatorService coordinatorService,
+ Long jobId) {
+ this.coordinatorServiceTest = coordinatorServiceTest;
+ this.coordinatorService = coordinatorService;
+ this.jobId = jobId;
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console.conf
new file mode 100644
index 0000000000..3c87ae4af4
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ plugin_input="fake"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_slot_not_enough.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_slot_not_enough.conf
new file mode 100644
index 0000000000..76d5e18dd2
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_slot_not_enough.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake"
+ parallelism = 6
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ plugin_input="fake"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_fixed_slots.yaml
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_fixed_slots.yaml
new file mode 100644
index 0000000000..0fbdcf64d0
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_fixed_slots.yaml
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ backup-count: 1
+ print-execution-info-interval: 10
+ slot-service:
+ dynamic-slot: false
+ slot-num: 5
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ namespace: /tmp/seatunnel/checkpoint_snapshot
+ storage.type: hdfs
+ fs.defaultFS: file:/// # Ensure that the directory has written
permission
\ No newline at end of file