[
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814046#comment-17814046
]
Rui Fan edited comment on FLINK-33981 at 2/5/24 7:10 AM:
---------------------------------------------------------
Hi [~fengjiajie] , would you mind backporting it to 1.17 and master branch as
well? Thanks a lot.
Merged to :
master(1.19) via: 360c1a0831b64bfb79699d81325c8f9783517be1
1.18 via: fc54ffd0c3e77f7fa01ce04c41291bd80900288f
1.17 via: 1419619b07122b5c734105311d8eff939a687596
was (Author: fanrui):
Hi [~fengjiajie] , would you mind backporting it to 1.17 and master branch as
well? Thanks a lot.
Merged to :
1.18 via: fc54ffd0c3e77f7fa01ce04c41291bd80900288f
> File Descriptor References Not Released After Job Execution in MiniCluster
> Mode
> -------------------------------------------------------------------------------
>
> Key: FLINK-33981
> URL: https://issues.apache.org/jira/browse/FLINK-33981
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.18.0
> Reporter: Feng Jiajie
> Assignee: Feng Jiajie
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> When using MiniCluster mode, file descriptors like
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are
> not released after a Job completes. Executing multiple Jobs in the same JVM
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java 18162 sa_cluster 30r DIR 253,1 0 1311962
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java 18162 sa_cluster 31r DIR 253,1 0 1311962
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java 18162 sa_cluster 32r DIR 253,1 0 1310787
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java 18162 sa_cluster 33r DIR 253,1 0 1310787
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java 18162 sa_cluster 34r DIR 253,1 0 1311960
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java 18162 sa_cluster 35r DIR 253,1 0 1311960
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java 18162 sa_cluster 36r DIR 253,1 0 1311974
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java 18162 sa_cluster 37r DIR 253,1 0 1311974
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java 18162 sa_cluster 38r DIR 253,1 0 1311979
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
> * javac -cp 'lib/*' TestReleaseFd.java
> * java -Xmx600m -cp '.:lib/*' TestReleaseFd
> */
> public class TestReleaseFd {
> public static void main(String[] args) throws Exception {
> for (int i = 0; i < 10; ++i) {
> int round = i;
> Thread thread = new Thread(() -> {
> try {
> Configuration configuration = new Configuration();
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> DataStreamSource<Long> longDataStreamSource = env.fromSequence(1,
> 100000);
> longDataStreamSource.addSink(new DiscardingSink<>());
> StreamGraph streamGraph = env.getStreamGraph();
> streamGraph.setJobName("test-" + System.nanoTime());
> JobClient jobClient = env.executeAsync(streamGraph);
> CompletableFuture<JobExecutionResult>
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
> JobExecutionResult jobExecutionResult = null;
> while (jobExecutionResult == null) {
> try {
> jobExecutionResult =
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
> } catch (TimeoutException timeoutException) {
> // ignore
> }
> }
> System.out.println("finished round: " + round);
> env.close();
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> });
> thread.setDaemon(true);
> thread.start();
> thread.join();
> System.out.println("done ... " + i);
> }
>
> // ======================= lsof -p 18162
> Thread.sleep(500_000_000);
> }
> }
> {code}
> The above code can be consistently reproduced in Flink 1.18.0, but there is
> no issue in Flink 1.14.6.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)