Repository: flink
Updated Branches:
  refs/heads/master db85f3858 -> db6528be0


[FLINK-3730] Fix RocksDB Local Directory Initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db6528be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db6528be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db6528be

Branch: refs/heads/master
Commit: db6528be00d624d6a389d5b303e565ca6b1c0f40
Parents: db85f38
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Apr 12 10:46:59 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Apr 13 10:36:18 2016 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBStateBackend.java      |  9 ++++++---
 .../streaming/state/RocksDBStateBackendConfigTest.java    | 10 +++++++++-
 2 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index e3b4f4d..8f846da 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
@@ -181,13 +182,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        
                        for (Path path : configuredDbBasePaths) {
                                File f = new File(path.toUri().getPath());
-                               if (!f.exists() && !f.mkdirs()) {
-                                       String msg = "Local DB files directory 
'" + f.getAbsolutePath()
+                               File testDir = new File(f, 
UUID.randomUUID().toString());
+                               if (!testDir.mkdirs()) {
+                                       String msg = "Local DB files directory 
'" + path
                                                        + "' does not exist and 
cannot be created. ";
                                        LOG.error(msg);
                                        errorMessage += msg;
+                               } else {
+                                       dirs.add(f);
                                }
-                               dirs.add(f);
                        }
                        
                        if (dirs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8e0993b..42ba275 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -133,14 +135,17 @@ public class RocksDBStateBackendConfigTest {
                        
                        RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
                        
rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
-                       
+
+                       boolean hasFailure = false;
                        try {
                                
rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", 
IntSerializer.INSTANCE);
                        }
                        catch (Exception e) {
                                assertTrue(e.getMessage().contains("No local 
storage directories available"));
                                
assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
+                               hasFailure = true;
                        }
+                       assertTrue("We must see a failure because no storaged 
directory is feasible.", hasFailure);
                }
                finally {
                        //noinspection ResultOfMethodCallIgnored
@@ -168,6 +173,9 @@ public class RocksDBStateBackendConfigTest {
        
                        try {
                                
rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", 
IntSerializer.INSTANCE);
+
+                               // actually get a state to see whether we can 
write to the storage directory
+                               rocksDbBackend.getPartitionedState(null, 
VoidSerializer.INSTANCE, new ValueStateDescriptor<>("test", String.class, ""));
                        }
                        catch (Exception e) {
                                e.printStackTrace();

Reply via email to