This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 830a17155cf04748ab471ddb6b4ad6a8ec1416e5
Author: Stefan Richter <[email protected]>
AuthorDate: Wed Jun 12 15:11:15 2024 +0200

    [FLINK-35580] Ensure RocksDB working directory is created before opening DB.
---
 .../contrib/streaming/state/RocksDBOperationUtils.java      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index b511bbfc620..74800101cd0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -76,13 +77,21 @@ public class RocksDBOperationUtils {
         RocksDB dbRef;
 
         try {
+            // Ensure that the working directory exists and is a directory to 
make RocksDB happy
+            File pathFile = new File(Preconditions.checkNotNull(path));
+
+            if (!pathFile.exists() && !pathFile.mkdirs() && 
!pathFile.isDirectory()) {
+                throw new IOException(
+                        "Could not create working directory for RocksDB 
instance: " + path);
+            }
+
             dbRef =
                     RocksDB.open(
                             Preconditions.checkNotNull(dbOptions),
-                            Preconditions.checkNotNull(path),
+                            path,
                             columnFamilyDescriptors,
                             stateColumnFamilyHandles);
-        } catch (RocksDBException e) {
+        } catch (Exception e) {
             IOUtils.closeQuietly(columnFamilyOptions);
             columnFamilyDescriptors.forEach((cfd) -> 
IOUtils.closeQuietly(cfd.getOptions()));
 

Reply via email to