Repository: flink Updated Branches: refs/heads/master db85f3858 -> db6528be0
[FLINK-3730] Fix RocksDB Local Directory Initialization Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db6528be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db6528be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db6528be Branch: refs/heads/master Commit: db6528be00d624d6a389d5b303e565ca6b1c0f40 Parents: db85f38 Author: Aljoscha Krettek <[email protected]> Authored: Tue Apr 12 10:46:59 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Apr 13 10:36:18 2016 +0200 ---------------------------------------------------------------------- .../contrib/streaming/state/RocksDBStateBackend.java | 9 ++++++--- .../streaming/state/RocksDBStateBackendConfigTest.java | 10 +++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index e3b4f4d..8f846da 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.UUID; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.FoldingState; @@ -181,13 +182,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { for (Path path : configuredDbBasePaths) { File f = new File(path.toUri().getPath()); - if (!f.exists() && !f.mkdirs()) { - String msg = "Local DB files directory '" + f.getAbsolutePath() + File testDir = new File(f, UUID.randomUUID().toString()); + if (!testDir.mkdirs()) { + String msg = "Local DB files directory '" + path + "' does not exist and cannot be created. "; LOG.error(msg); errorMessage += msg; + } else { + dirs.add(f); } - dirs.add(f); } if (dirs.isEmpty()) { http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 8e0993b..42ba275 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -20,8 +20,10 @@ package org.apache.flink.contrib.streaming.state; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -133,14 +135,17 @@ public class RocksDBStateBackendConfigTest { RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); - + + boolean hasFailure = false; try { rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE); } catch (Exception e) { assertTrue(e.getMessage().contains("No local storage directories available")); assertTrue(e.getMessage().contains(targetDir.getAbsolutePath())); + hasFailure = true; } + assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure); } finally { //noinspection ResultOfMethodCallIgnored @@ -168,6 +173,9 @@ public class RocksDBStateBackendConfigTest { try { rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE); + + // actually get a state to see whether we can write to the storage directory + rocksDbBackend.getPartitionedState(null, VoidSerializer.INSTANCE, new ValueStateDescriptor<>("test", String.class, "")); } catch (Exception e) { e.printStackTrace();
