[ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33981:
--------------------------------
    Description: 
When using MiniCluster mode, file descriptors like 
*{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running *lsof -p 18162* reveals:
{code:java}
...
java    18162 sa_cluster   30r   DIR              253,1         0    1311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java    18162 sa_cluster   31r   DIR              253,1         0    1311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java    18162 sa_cluster   32r   DIR              253,1         0    1310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java    18162 sa_cluster   33r   DIR              253,1         0    1310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java    18162 sa_cluster   34r   DIR              253,1         0    1311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java    18162 sa_cluster   35r   DIR              253,1         0    1311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java    18162 sa_cluster   36r   DIR              253,1         0    1311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java    18162 sa_cluster   37r   DIR              253,1         0    1311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java    18162 sa_cluster   38r   DIR              253,1         0    1311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
...
{code}
The code used for reproduction is as follows:
{code:java}
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * javac -cp 'lib/*' TestReleaseFd.java
 * java -Xmx600m -cp '.:lib/*' TestReleaseFd
 */
public class TestReleaseFd {

  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 10; ++i) {
      int round = i;
      Thread thread = new Thread(() -> {
        try {
          Configuration configuration = new Configuration();
          final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
          env.setParallelism(1);

          DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 
100000);
          longDataStreamSource.addSink(new DiscardingSink<>());

          StreamGraph streamGraph = env.getStreamGraph();
          streamGraph.setJobName("test-" + System.nanoTime());
          JobClient jobClient = env.executeAsync(streamGraph);

          CompletableFuture<JobExecutionResult> 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
          JobExecutionResult jobExecutionResult = null;
          while (jobExecutionResult == null) {
            try {
              jobExecutionResult = jobExecutionResultCompletableFuture.get(20, 
TimeUnit.SECONDS);
            } catch (TimeoutException timeoutException) {
              // ignore
            }
          }
          System.out.println("finished round: " + round);
          env.close();
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      });

      thread.setDaemon(true);
      thread.start();
      thread.join();

      System.out.println("done ... " + i);
    }
    
    // ======================= lsof -p 18162
    Thread.sleep(500_000_000);
  }
}
 {code}
The above code can be consistently reproduced in Flink 1.18.0, but there is no 
issue in Flink 1.14.6.

  was:
When using MiniCluster mode, file descriptors like 
{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not 
released after a Job completes. Executing multiple Jobs in the same JVM might 
result in leftover file descriptors, potentially leading to problems.

After executing the reproducing code provided below (after entering the sleep), 
running lsof -p 18162 reveals:
{code:java}
...
java    18162 sa_cluster   30r   DIR              253,1         0    1311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java    18162 sa_cluster   31r   DIR              253,1         0    1311962 
/tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
java    18162 sa_cluster   32r   DIR              253,1         0    1310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java    18162 sa_cluster   33r   DIR              253,1         0    1310787 
/tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
java    18162 sa_cluster   34r   DIR              253,1         0    1311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java    18162 sa_cluster   35r   DIR              253,1         0    1311960 
/tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
java    18162 sa_cluster   36r   DIR              253,1         0    1311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java    18162 sa_cluster   37r   DIR              253,1         0    1311974 
/tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
java    18162 sa_cluster   38r   DIR              253,1         0    1311979 
/tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
...
{code}
The code used for reproduction is as follows:
{code:java}
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * javac -cp 'lib/*' TestReleaseFd.java
 * java -Xmx600m -cp '.:lib/*' TestReleaseFd
 */
public class TestReleaseFd {

  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 10; ++i) {
      int round = i;
      Thread thread = new Thread(() -> {
        try {
          Configuration configuration = new Configuration();
          final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
          env.setParallelism(1);

          DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 
100000);
          longDataStreamSource.addSink(new DiscardingSink<>());

          StreamGraph streamGraph = env.getStreamGraph();
          streamGraph.setJobName("test-" + System.nanoTime());
          JobClient jobClient = env.executeAsync(streamGraph);

          CompletableFuture<JobExecutionResult> 
jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
          JobExecutionResult jobExecutionResult = null;
          while (jobExecutionResult == null) {
            try {
              jobExecutionResult = jobExecutionResultCompletableFuture.get(20, 
TimeUnit.SECONDS);
            } catch (TimeoutException timeoutException) {
              // ignore
            }
          }
          System.out.println("finished round: " + round);
          env.close();
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      });

      thread.setDaemon(true);
      thread.start();
      thread.join();

      System.out.println("done ... " + i);
    }
    
    // ======================= lsof -p 18162
    Thread.sleep(500_000_000);
  }
}
 {code}
The above code can be consistently reproduced in Flink 1.18.0, but there is no 
issue in Flink 1.14.6.


> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-33981
>                 URL: https://issues.apache.org/jira/browse/FLINK-33981
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.18.0
>            Reporter: Feng Jiajie
>            Priority: Major
>
> When using MiniCluster mode, file descriptors like 
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are 
> not released after a Job completes. Executing multiple Jobs in the same JVM 
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java    18162 sa_cluster   30r   DIR              253,1         0    1311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java    18162 sa_cluster   31r   DIR              253,1         0    1311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java    18162 sa_cluster   32r   DIR              253,1         0    1310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java    18162 sa_cluster   33r   DIR              253,1         0    1310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java    18162 sa_cluster   34r   DIR              253,1         0    1311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java    18162 sa_cluster   35r   DIR              253,1         0    1311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java    18162 sa_cluster   36r   DIR              253,1         0    1311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java    18162 sa_cluster   37r   DIR              253,1         0    1311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java    18162 sa_cluster   38r   DIR              253,1         0    1311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
>     for (int i = 0; i < 10; ++i) {
>       int round = i;
>       Thread thread = new Thread(() -> {
>         try {
>           Configuration configuration = new Configuration();
>           final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>           env.setParallelism(1);
>           DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 
> 100000);
>           longDataStreamSource.addSink(new DiscardingSink<>());
>           StreamGraph streamGraph = env.getStreamGraph();
>           streamGraph.setJobName("test-" + System.nanoTime());
>           JobClient jobClient = env.executeAsync(streamGraph);
>           CompletableFuture<JobExecutionResult> 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>           JobExecutionResult jobExecutionResult = null;
>           while (jobExecutionResult == null) {
>             try {
>               jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
>             } catch (TimeoutException timeoutException) {
>               // ignore
>             }
>           }
>           System.out.println("finished round: " + round);
>           env.close();
>         } catch (Exception e) {
>           throw new RuntimeException(e);
>         }
>       });
>       thread.setDaemon(true);
>       thread.start();
>       thread.join();
>       System.out.println("done ... " + i);
>     }
>     
>     // ======================= lsof -p 18162
>     Thread.sleep(500_000_000);
>   }
> }
>  {code}
> The above code can be consistently reproduced in Flink 1.18.0, but there is 
> no issue in Flink 1.14.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to