This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 45609ef1df [Fix][Zeta] Fix Imap resource leak (#9696)
45609ef1df is described below

commit 45609ef1dfd88d4844a90eec0d0984b1797e44d4
Author: dy102 <[email protected]>
AuthorDate: Thu Aug 21 12:03:46 2025 +0900

    [Fix][Zeta] Fix Imap resource leak (#9696)
    
    Co-authored-by: Jia Fan <[email protected]>
---
 .../engine/server/CoordinatorService.java          |   2 +
 .../server/checkpoint/CheckpointCoordinator.java   |   4 +
 .../server/checkpoint/CheckpointManager.java       |   3 -
 .../seatunnel/engine/server/master/JobMaster.java  |  13 +++
 .../engine/server/CoordinatorServiceTest.java      | 103 +++++++++++++++++++--
 .../src/test/resources/batch_fake_to_console.conf  |  48 ++++++++++
 .../src/test/resources/batch_slot_not_enough.conf  |  47 ++++++++++
 .../src/test/resources/seatunnel_fixed_slots.yaml  |  34 +++++++
 8 files changed, 245 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 621c5fc91a..166f0dd961 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -212,6 +212,7 @@ public class CoordinatorService {
                                 
.setNameFormat("seatunnel-coordinator-service-%d")
                                 .build(),
                         new ThreadPoolStatus.RejectionCountingHandler());
+
         this.seaTunnelServer = seaTunnelServer;
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
@@ -287,6 +288,7 @@ public class CoordinatorService {
             } else {
                 queueRemove(jobMaster);
                 completeFailJob(jobMaster);
+                pendingJobMasterMap.remove(jobId);
                 return;
             }
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 505dc32051..957090494c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -1069,6 +1069,10 @@ public class CheckpointCoordinator {
         }
     }
 
+    public String getCheckpointStateImapKey() {
+        return checkpointStateImapKey;
+    }
+
     /** Only for test */
     @VisibleForTesting
     public PendingCheckpoint getSavepointPendingCheckpoint() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index db78e47eb1..d579fea9ec 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
-import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-
 import org.apache.seatunnel.api.tracing.MDCTracer;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
@@ -176,7 +174,6 @@ public class CheckpointManager {
         
getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
     }
 
-    @VisibleForTesting
     public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
         CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
         if (coordinator == null) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index ee19e4dd85..59dfb99763 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.master;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -624,6 +626,12 @@ public class JobMaster {
                                                 
runningJobStateTimestampsIMap.remove(
                                                         
task.getTaskGroupLocation());
                                             });
+
+                            String checkpointStateImapKey =
+                                    checkpointManager
+                                            
.getCheckpointCoordinator(pipeline.getPipelineId())
+                                            .getCheckpointStateImapKey();
+                            runningJobStateIMap.remove(checkpointStateImapKey);
                         });
 
         runningJobStateIMap.remove(jobId);
@@ -1084,4 +1092,9 @@ public class JobMaster {
     public CoordinatorService getCoordinatorService() {
         return this.seaTunnelServer.getCoordinatorService();
     }
+
+    @VisibleForTesting
+    public IMap<Object, Object> getRunningJobStateIMap() {
+        return runningJobStateIMap;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index d7913b56ee..7a9ef89ec3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -36,7 +36,10 @@ import org.junitpioneer.jupiter.SetEnvironmentVariable;
 
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.map.IMap;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
@@ -149,11 +152,72 @@ public class CoordinatorServiceTest {
     }
 
     @Test
-    public void testClearCoordinatorService() {
+    void testCleanupPendingJobMasterMapAfterJobFailed() {
+        setConfigFile("seatunnel_fixed_slots.yaml");
+
+        JobInformation jobInformation =
+                submitJob(
+                        
"CoordinatorServiceTest_testCleanupPendingJobMasterMapAfterJobFailed",
+                        "batch_slot_not_enough.conf",
+                        
"test_cleanup_pending_job_master_map_after_job_failed");
+
+        Assertions.assertNotNull(
+                
jobInformation.coordinatorService.pendingJobMasterMap.get(jobInformation.jobId));
+
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertNull(
+                                        
jobInformation.coordinatorService.pendingJobMasterMap.get(
+                                                jobInformation.jobId)));
+
+        jobInformation.coordinatorService.clearCoordinatorService();
+        jobInformation.coordinatorServiceTest.shutdown();
+
+        setDefaultConfigFile();
+    }
+
+    @Test
+    void testCleanupRunningJobStateIMap() {
+        JobInformation jobInformation =
+                submitJob(
+                        
"CoordinatorServiceTest_testCleanupRunningJobStateIMap",
+                        "batch_fake_to_console.conf",
+                        "test_cleanup_running_job_state_imap");
+        CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
+        IMap<Object, Object> runningJobStateIMap =
+                
coordinatorService.getJobMaster(jobInformation.jobId).getRunningJobStateIMap();
+        Assertions.assertTrue(!runningJobStateIMap.isEmpty());
+
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(runningJobStateIMap.isEmpty()));
+
+        jobInformation.coordinatorService.clearCoordinatorService();
+        jobInformation.coordinatorServiceTest.shutdown();
+    }
+
+    private void setDefaultConfigFile() {
+        setConfigFile("seatunnel.yaml");
+    }
+
+    private void setConfigFile(String fileName) {
+        String rootModuleDir = "seatunnel-engine";
+        Path path = Paths.get(System.getProperty("user.dir"));
+        while (!path.endsWith(Paths.get(rootModuleDir))) {
+            path = path.getParent();
+        }
+        String rootPath = path.getParent().toString();
+        System.setProperty(
+                "seatunnel.config",
+                rootPath
+                        + 
"/seatunnel-engine/seatunnel-engine-server/src/test/resources/"
+                        + fileName);
+    }
+
+    private JobInformation submitJob(String testClassName, String 
jobConfigFile, String jobName) {
         HazelcastInstanceImpl coordinatorServiceTest =
                 SeaTunnelServerStarter.createHazelcastInstance(
-                        TestUtils.getClusterName(
-                                
"CoordinatorServiceTest_testClearCoordinatorService"));
+                        TestUtils.getClusterName(testClassName));
         SeaTunnelServer server1 =
                 coordinatorServiceTest
                         .node
@@ -166,9 +230,7 @@ public class CoordinatorServiceTest {
                 coordinatorServiceTest
                         
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
                         .newId();
-        LogicalDag testLogicalDag =
-                TestUtils.createTestLogicalPlan(
-                        "stream_fake_to_console.conf", 
"test_clear_coordinator_service", jobId);
+        LogicalDag testLogicalDag = 
TestUtils.createTestLogicalPlan(jobConfigFile, jobName, jobId);
 
         JobImmutableInformation jobImmutableInformation =
                 new JobImmutableInformation(
@@ -185,6 +247,20 @@ public class CoordinatorServiceTest {
         coordinatorService
                 .submitJob(jobId, data, 
jobImmutableInformation.isStartWithSavePoint())
                 .join();
+        return new JobInformation(coordinatorServiceTest, coordinatorService, 
jobId);
+    }
+
+    @Test
+    public void testClearCoordinatorService() {
+        JobInformation jobInformation =
+                submitJob(
+                        "CoordinatorServiceTest_testClearCoordinatorService",
+                        "stream_fake_to_console.conf",
+                        "test_clear_coordinator_service");
+
+        CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
+        Long jobId = jobInformation.jobId;
+        HazelcastInstanceImpl coordinatorServiceTest = 
jobInformation.coordinatorServiceTest;
 
         // waiting for job status turn to running
         await().atMost(10000, TimeUnit.MILLISECONDS)
@@ -349,4 +425,19 @@ public class CoordinatorServiceTest {
                             .size());
         }
     }
+
+    private static class JobInformation {
+        public final HazelcastInstanceImpl coordinatorServiceTest;
+        public final CoordinatorService coordinatorService;
+        public final Long jobId;
+
+        public JobInformation(
+                HazelcastInstanceImpl coordinatorServiceTest,
+                CoordinatorService coordinatorService,
+                Long jobId) {
+            this.coordinatorServiceTest = coordinatorServiceTest;
+            this.coordinatorService = coordinatorService;
+            this.jobId = jobId;
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console.conf
new file mode 100644
index 0000000000..3c87ae4af4
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    plugin_output = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    plugin_input="fake"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_slot_not_enough.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_slot_not_enough.conf
new file mode 100644
index 0000000000..76d5e18dd2
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_slot_not_enough.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    plugin_output = "fake"
+    parallelism = 6
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    plugin_input="fake"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_fixed_slots.yaml
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_fixed_slots.yaml
new file mode 100644
index 0000000000..0fbdcf64d0
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_fixed_slots.yaml
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+  engine:
+    backup-count: 1
+    print-execution-info-interval: 10
+    slot-service:
+      dynamic-slot: false
+      slot-num: 5
+    checkpoint:
+      interval: 6000
+      timeout: 7000
+      storage:
+        type: hdfs
+        max-retained: 3
+        plugin-config:
+          namespace: /tmp/seatunnel/checkpoint_snapshot
+          storage.type: hdfs
+          fs.defaultFS: file:/// # Ensure that the directory has written 
permission
\ No newline at end of file

Reply via email to