[hotfix] [RocksDB backend] Minor cleanups to constructors and comments in RocksDBStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9ddcb4c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9ddcb4c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9ddcb4c Branch: refs/heads/master Commit: d9ddcb4c297082ef9544c1b5d94c75267662b4c6 Parents: d450bee Author: Stephan Ewen <[email protected]> Authored: Mon Nov 7 18:39:32 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 15:26:42 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBStateBackend.java | 34 ++++++++------------ .../state/RocksDBAsyncSnapshotTest.java | 6 ++-- .../state/RocksDBStateBackendConfigTest.java | 3 +- .../state/RocksDBStateBackendTest.java | 2 +- .../flink/cep/operator/CEPOperatorTest.java | 10 ++---- .../EventTimeWindowCheckpointingITCase.java | 3 +- .../test/state/ManualWindowSpeedITCase.java | 11 +++---- 7 files changed, 26 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 37c6312..9ba0dc1 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -29,15 +29,15 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; + import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -134,29 +134,23 @@ public class RocksDBStateBackend extends AbstractStateBackend { * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public RocksDBStateBackend(URI checkpointDataUri) throws IOException { - // creating the FsStateBackend automatically sanity checks the URI - FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); - - this.checkpointStreamBackend = fsStateBackend; - } - - - public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException { - this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend); + this(new FsStateBackend(checkpointDataUri)); } - public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend checkpointStreamBackend) throws IOException { + /** + * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its + * checkpoint data streams. Typically, one would supply a filesystem or database state backend + * here where the snapshots from RocksDB would be stored. + * + * <p>The snapshots of the RocksDB state will be stored using the given backend's + * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}. + * + * @param checkpointStreamBackend The backend to store the + */ + public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) { this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend); } - private void writeObject(ObjectOutputStream oos) throws IOException { - oos.defaultWriteObject(); - } - - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - ois.defaultReadObject(); - isInitialized = false; - } // ------------------------------------------------------------------------ // State backend methods // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 1c5a91c..f3da93e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -119,9 +119,8 @@ public class RocksDBAsyncSnapshotTest { StreamConfig streamConfig = testHarness.getStreamConfig(); File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state"); - File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots"); - RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()); + RocksDBStateBackend backend = new RocksDBStateBackend(new MemoryStateBackend()); backend.setDbStoragePath(dbDir.getAbsolutePath()); streamConfig.setStateBackend(backend); @@ -220,11 +219,10 @@ public class RocksDBAsyncSnapshotTest { StreamConfig streamConfig = testHarness.getStreamConfig(); File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state"); - File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots"); BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend(); - RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend); + RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend); backend.setDbStoragePath(dbDir.getAbsolutePath()); streamConfig.setStateBackend(backend); http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 07fc27c..bf9b315 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -343,8 +343,7 @@ public class RocksDBStateBackendConfigTest { @Test public void testCallsForwardedToNonPartitionedBackend() throws Exception { AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class); - String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, nonPartBackend); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(nonPartBackend); Environment env = getMockEnvironment(); rocksDbBackend.createStreamFactory(env.getJobID(), "foobar"); http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 9222f0b..9d25434 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -45,7 +45,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa protected RocksDBStateBackend getStateBackend() throws IOException { String dbPath = tempFolder.newFolder().getAbsolutePath(); String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend backend = new RocksDBStateBackend(checkpointPath, new FsStateBackend(checkpointPath)); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath)); backend.setDbStoragePath(dbPath); return backend; } http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 0f49b13..6cffd9c 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -301,9 +301,7 @@ public class CEPOperatorTest extends TestLogger { public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception { String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); - String rocksDbBackups = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDBStateBackend = - new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { @@ -350,8 +348,7 @@ public class CEPOperatorTest extends TestLogger { keySelector, BasicTypeInfo.INT_TYPE_INFO); - rocksDBStateBackend = - new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); harness.setStateBackend(rocksDBStateBackend); @@ -381,8 +378,7 @@ public class CEPOperatorTest extends TestLogger { keySelector, BasicTypeInfo.INT_TYPE_INFO); - rocksDBStateBackend = - new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); harness.setStateBackend(rocksDBStateBackend); harness.setup(); http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 4dbf5cb..4f28d8c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -120,8 +120,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { } case ROCKSDB_FULLY_ASYNC: { String rocksDb = tempFolder.newFolder().getAbsolutePath(); - String rocksDbBackups = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE)); + RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE)); rdb.setDbStoragePath(rocksDb); this.stateBackend = rdb; break; http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java index 428c47c..456861a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java @@ -127,9 +127,8 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase { env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); - - String checkpoints = tempFolder.newFolder().toURI().toString(); - env.setStateBackend(new RocksDBStateBackend(checkpoints, new MemoryStateBackend())); + + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); env.addSource(new InfiniteTupleSource(10_000)) .keyBy(0) @@ -163,8 +162,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase { env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); - String rocksDbBackups = tempFolder.newFolder().toURI().toString(); - env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend())); + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); env.addSource(new InfiniteTupleSource(10_000)) .keyBy(0) @@ -199,8 +197,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase { env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setParallelism(1); - String rocksDbBackups = tempFolder.newFolder().toURI().toString(); - env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend())); + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); env.addSource(new InfiniteTupleSource(10_000)) .keyBy(0)
