[hotfix] [RocksDB backend] Minor cleanups to constructors and comments in 
RocksDBStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9ddcb4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9ddcb4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9ddcb4c

Branch: refs/heads/master
Commit: d9ddcb4c297082ef9544c1b5d94c75267662b4c6
Parents: d450bee
Author: Stephan Ewen <[email protected]>
Authored: Mon Nov 7 18:39:32 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 15:26:42 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 34 ++++++++------------
 .../state/RocksDBAsyncSnapshotTest.java         |  6 ++--
 .../state/RocksDBStateBackendConfigTest.java    |  3 +-
 .../state/RocksDBStateBackendTest.java          |  2 +-
 .../flink/cep/operator/CEPOperatorTest.java     | 10 ++----
 .../EventTimeWindowCheckpointingITCase.java     |  3 +-
 .../test/state/ManualWindowSpeedITCase.java     | 11 +++----
 7 files changed, 26 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 37c6312..9ba0dc1 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,15 +29,15 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -134,29 +134,23 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
-               // creating the FsStateBackend automatically sanity checks the 
URI
-               FsStateBackend fsStateBackend = new 
FsStateBackend(checkpointDataUri);
-
-               this.checkpointStreamBackend = fsStateBackend;
-       }
-
-
-       public RocksDBStateBackend(String checkpointDataUri, 
AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-               this(new Path(checkpointDataUri).toUri(), 
nonPartitionedStateBackend);
+               this(new FsStateBackend(checkpointDataUri));
        }
 
-       public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend 
checkpointStreamBackend) throws IOException {
+       /**
+        * Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
+        * checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
+        * here where the snapshots from RocksDB would be stored.
+        * 
+        * <p>The snapshots of the RocksDB state will be stored using the given 
backend's
+        * {@link AbstractStateBackend#createStreamFactory(JobID, String) 
checkpoint stream}. 
+        * 
+        * @param checkpointStreamBackend The backend to store the
+        */
+       public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend) {
                this.checkpointStreamBackend = 
requireNonNull(checkpointStreamBackend);
        }
 
-       private void writeObject(ObjectOutputStream oos) throws IOException {
-               oos.defaultWriteObject();
-       }
-
-       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
-               ois.defaultReadObject();
-               isInitialized = false;
-       }
        // 
------------------------------------------------------------------------
        //  State backend methods
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 1c5a91c..f3da93e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -119,9 +119,8 @@ public class RocksDBAsyncSnapshotTest {
                StreamConfig streamConfig = testHarness.getStreamConfig();
 
                File dbDir = new File(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString()), "state");
-               File chkDir = new File(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString()), "snapshots");
 
-               RocksDBStateBackend backend = new 
RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
+               RocksDBStateBackend backend = new RocksDBStateBackend(new 
MemoryStateBackend());
                backend.setDbStoragePath(dbDir.getAbsolutePath());
 
                streamConfig.setStateBackend(backend);
@@ -220,11 +219,10 @@ public class RocksDBAsyncSnapshotTest {
                StreamConfig streamConfig = testHarness.getStreamConfig();
 
                File dbDir = new File(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString()), "state");
-               File chkDir = new File(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString()), "snapshots");
 
                BlockingStreamMemoryStateBackend memoryStateBackend = new 
BlockingStreamMemoryStateBackend();
 
-               RocksDBStateBackend backend = new 
RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend);
+               RocksDBStateBackend backend = new 
RocksDBStateBackend(memoryStateBackend);
                backend.setDbStoragePath(dbDir.getAbsolutePath());
 
                streamConfig.setStateBackend(backend);

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 07fc27c..bf9b315 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -343,8 +343,7 @@ public class RocksDBStateBackendConfigTest {
        @Test
        public void testCallsForwardedToNonPartitionedBackend() throws 
Exception {
                AbstractStateBackend nonPartBackend = 
mock(AbstractStateBackend.class);
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath, nonPartBackend);
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(nonPartBackend);
 
                Environment env = getMockEnvironment();
                rocksDbBackend.createStreamFactory(env.getJobID(), "foobar");

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9222f0b..9d25434 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -45,7 +45,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        protected RocksDBStateBackend getStateBackend() throws IOException {
                String dbPath = tempFolder.newFolder().getAbsolutePath();
                String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend backend = new 
RocksDBStateBackend(checkpointPath, new FsStateBackend(checkpointPath));
+               RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath));
                backend.setDbStoragePath(dbPath);
                return backend;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 0f49b13..6cffd9c 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -301,9 +301,7 @@ public class CEPOperatorTest extends TestLogger {
        public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws 
Exception {
 
                String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
-               String rocksDbBackups = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDBStateBackend =
-                               new RocksDBStateBackend(rocksDbBackups, new 
MemoryStateBackend());
+               RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
                KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
@@ -350,8 +348,7 @@ public class CEPOperatorTest extends TestLogger {
                                keySelector,
                                BasicTypeInfo.INT_TYPE_INFO);
 
-               rocksDBStateBackend =
-                               new RocksDBStateBackend(rocksDbBackups, new 
MemoryStateBackend());
+               rocksDBStateBackend = new RocksDBStateBackend(new 
MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
                harness.setStateBackend(rocksDBStateBackend);
 
@@ -381,8 +378,7 @@ public class CEPOperatorTest extends TestLogger {
                                keySelector,
                                BasicTypeInfo.INT_TYPE_INFO);
 
-               rocksDBStateBackend =
-                               new RocksDBStateBackend(rocksDbBackups, new 
MemoryStateBackend());
+               rocksDBStateBackend = new RocksDBStateBackend(new 
MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
                harness.setStateBackend(rocksDBStateBackend);
                harness.setup();

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4dbf5cb..4f28d8c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -120,8 +120,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        }
                        case ROCKSDB_FULLY_ASYNC: {
                                String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
-                               String rocksDbBackups = 
tempFolder.newFolder().toURI().toString();
-                               RocksDBStateBackend rdb = new 
RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+                               RocksDBStateBackend rdb = new 
RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
                                rdb.setDbStoragePath(rocksDb);
                                this.stateBackend = rdb;
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index 428c47c..456861a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -127,9 +127,8 @@ public class ManualWindowSpeedITCase extends 
StreamingMultipleProgramsTestBase {
 
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
                env.setParallelism(1);
-
-               String checkpoints = tempFolder.newFolder().toURI().toString();
-               env.setStateBackend(new RocksDBStateBackend(checkpoints, new 
MemoryStateBackend()));
+               
+               env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBackend()));
 
                env.addSource(new InfiniteTupleSource(10_000))
                                .keyBy(0)
@@ -163,8 +162,7 @@ public class ManualWindowSpeedITCase extends 
StreamingMultipleProgramsTestBase {
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
                env.setParallelism(1);
 
-               String rocksDbBackups = 
tempFolder.newFolder().toURI().toString();
-               env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new 
MemoryStateBackend()));
+               env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBackend()));
 
                env.addSource(new InfiniteTupleSource(10_000))
                                .keyBy(0)
@@ -199,8 +197,7 @@ public class ManualWindowSpeedITCase extends 
StreamingMultipleProgramsTestBase {
                
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                env.setParallelism(1);
 
-               String rocksDbBackups = 
tempFolder.newFolder().toURI().toString();
-               env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new 
MemoryStateBackend()));
+               env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBackend()));
 
                env.addSource(new InfiniteTupleSource(10_000))
                                .keyBy(0)

Reply via email to