This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new c6265bff574 [BP-2.0][FLINK-36838][state/forst] Fix the deadlock when 
quit forst state backend (#26054)
c6265bff574 is described below

commit c6265bff574afb1cbee41ff68bb7ebde7a7b1de9
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Thu Jan 23 10:27:25 2025 +0800

    [BP-2.0][FLINK-36838][state/forst] Fix the deadlock when quit forst state 
backend (#26054)
---
 flink-dist/src/main/resources/META-INF/NOTICE             |  2 +-
 flink-state-backends/flink-statebackend-forst/pom.xml     |  2 +-
 .../apache/flink/state/forst/ForStResourceContainer.java  | 15 ++-------------
 3 files changed, 4 insertions(+), 15 deletions(-)

diff --git a/flink-dist/src/main/resources/META-INF/NOTICE 
b/flink-dist/src/main/resources/META-INF/NOTICE
index 2d28859adaa..f09cf709b92 100644
--- a/flink-dist/src/main/resources/META-INF/NOTICE
+++ b/flink-dist/src/main/resources/META-INF/NOTICE
@@ -9,7 +9,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.google.code.findbugs:jsr305:1.3.9
 - com.twitter:chill-java:0.7.6
 - com.ververica:frocksdbjni:8.10.0-ververica-beta-1.0
-- com.ververica:forstjni:0.1.5
+- com.ververica:forstjni:0.1.6
 - commons-cli:commons-cli:1.5.0
 - commons-collections:commons-collections:3.2.2
 - commons-io:commons-io:2.15.1
diff --git a/flink-state-backends/flink-statebackend-forst/pom.xml 
b/flink-state-backends/flink-statebackend-forst/pom.xml
index d3e5740fdef..433e1f6ff04 100644
--- a/flink-state-backends/flink-statebackend-forst/pom.xml
+++ b/flink-state-backends/flink-statebackend-forst/pom.xml
@@ -63,7 +63,7 @@ under the License.
                <dependency>
                        <groupId>com.ververica</groupId>
                        <artifactId>forstjni</artifactId>
-                       <version>0.1.5</version>
+                       <version>0.1.6</version>
                </dependency>
 
                <!-- test dependencies -->
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index c0590bb5a1a..ba76b72371d 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -45,7 +45,6 @@ import org.forstdb.Filter;
 import org.forstdb.FlinkEnv;
 import org.forstdb.IndexType;
 import org.forstdb.PlainTableConfig;
-import org.forstdb.Priority;
 import org.forstdb.ReadOptions;
 import org.forstdb.Statistics;
 import org.forstdb.TableFormatConfig;
@@ -85,8 +84,6 @@ public final class ForStResourceContainer implements 
AutoCloseable {
     // and the db data dir's absolute path will be used as the log file name's 
prefix.
     private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_SUFFIX.length();
 
-    @Nullable private FlinkEnv flinkEnv = null;
-
     @Nullable private final Path remoteBasePath;
 
     @Nullable private final Path remoteForStPath;
@@ -239,11 +236,12 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         // configured,
         //  fallback to local directory currently temporarily.
         if (remoteForStPath != null) {
-            flinkEnv =
+            FlinkEnv flinkEnv =
                     new FlinkEnv(
                             remoteBasePath.toString(),
                             new StringifiedForStFileSystem(forStFileSystem));
             opt.setEnv(flinkEnv);
+            handlesToClose.add(flinkEnv);
         }
 
         return opt;
@@ -469,15 +467,6 @@ public final class ForStResourceContainer implements 
AutoCloseable {
             sharedResources.close();
         }
         cleanRelocatedDbLogs();
-        if (flinkEnv != null) {
-            // There is something wrong with the FlinkEnv, the background 
threads won't quit during
-            // the disposal of DB. We explicit shrink the thread pool here 
until the ForSt repo
-            // fixes that.
-            flinkEnv.setBackgroundThreads(0, Priority.LOW);
-            flinkEnv.setBackgroundThreads(0, Priority.HIGH);
-            flinkEnv.close();
-            flinkEnv = null;
-        }
     }
 
     /**

Reply via email to