http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
deleted file mode 100644
index 3a39ba0..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.util.IOUtils;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.DBOptions;
-
-import java.io.File;
-
-import static org.hamcrest.CoreMatchers.anyOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for configuring the RocksDB State Backend.
- */
-@SuppressWarnings("serial")
-public class RocksDBStateBackendConfigTest {
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       // 
------------------------------------------------------------------------
-       //  RocksDB local file directory
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testDefaultsInSync() throws Exception {
-               final boolean defaultIncremental = 
CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
-
-               RocksDBStateBackend backend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
-               assertEquals(defaultIncremental, 
backend.isIncrementalCheckpointsEnabled());
-       }
-
-       @Test
-       public void testSetDbPath() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               File testDir1 = tempFolder.newFolder();
-               File testDir2 = tempFolder.newFolder();
-
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               assertNull(rocksDbBackend.getDbStoragePaths());
-
-               rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath());
-               assertArrayEquals(new String[] { new 
Path(testDir1.getAbsolutePath()).toString() }, 
rocksDbBackend.getDbStoragePaths());
-
-               rocksDbBackend.setDbStoragePath(null);
-               assertNull(rocksDbBackend.getDbStoragePaths());
-
-               rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), 
testDir2.getAbsolutePath());
-               assertArrayEquals(new String[] { new 
Path(testDir1.getAbsolutePath()).toString(), new 
Path(testDir2.getAbsolutePath()).toString() }, 
rocksDbBackend.getDbStoragePaths());
-
-               Environment env = getMockEnvironment();
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
-                               createKeyedStateBackend(
-                                               env,
-                                               env.getJobID(),
-                                               "test_op",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               env.getTaskKvStateRegistry());
-
-               try {
-                       File instanceBasePath = 
keyedBackend.getInstanceBasePath();
-                       assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(testDir1.getAbsolutePath()), 
startsWith(testDir2.getAbsolutePath())));
-
-                       //noinspection NullArgumentToVariableArgMethod
-                       rocksDbBackend.setDbStoragePaths(null);
-                       assertNull(rocksDbBackend.getDbStoragePaths());
-               } finally {
-                       IOUtils.closeQuietly(keyedBackend);
-                       keyedBackend.dispose();
-               }
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testSetNullPaths() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-               rocksDbBackend.setDbStoragePaths();
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testNonFileSchemePath() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-               
rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
-       }
-
-       // 
------------------------------------------------------------------------
-       //  RocksDB local file automatic from temp directories
-       // 
------------------------------------------------------------------------
-
-       /**
-        * This tests whether the RocksDB backends uses the temp directories 
that are provided
-        * from the {@link Environment} when no db storage path is set.
-        */
-       @Test
-       public void testUseTempDirectories() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               File dir1 = tempFolder.newFolder();
-               File dir2 = tempFolder.newFolder();
-
-               File[] tempDirs = new File[] { dir1, dir2 };
-
-               assertNull(rocksDbBackend.getDbStoragePaths());
-
-               Environment env = getMockEnvironment(tempDirs);
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
-                               createKeyedStateBackend(
-                                               env,
-                                               env.getJobID(),
-                                               "test_op",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               env.getTaskKvStateRegistry());
-
-               try {
-                       File instanceBasePath = 
keyedBackend.getInstanceBasePath();
-                       assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
-               } finally {
-                       IOUtils.closeQuietly(keyedBackend);
-                       keyedBackend.dispose();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  RocksDB local file directory initialization
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testFailWhenNoLocalStorageDir() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-               File targetDir = tempFolder.newFolder();
-
-               try {
-                       if (!targetDir.setWritable(false, false)) {
-                               System.err.println("Cannot execute 
'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable");
-                               return;
-                       }
-
-                       
rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
-
-                       boolean hasFailure = false;
-                       try {
-                               Environment env = getMockEnvironment();
-                               rocksDbBackend.createKeyedStateBackend(
-                                               env,
-                                               env.getJobID(),
-                                               "foobar",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
-                       }
-                       catch (Exception e) {
-                               assertTrue(e.getMessage().contains("No local 
storage directories available"));
-                               
assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
-                               hasFailure = true;
-                       }
-                       assertTrue("We must see a failure because no storaged 
directory is feasible.", hasFailure);
-               }
-               finally {
-                       //noinspection ResultOfMethodCallIgnored
-                       targetDir.setWritable(true, false);
-               }
-       }
-
-       @Test
-       public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
-               File targetDir1 = tempFolder.newFolder();
-               File targetDir2 = tempFolder.newFolder();
-
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               try {
-
-                       if (!targetDir1.setWritable(false, false)) {
-                               System.err.println("Cannot execute 
'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory 
non-writable");
-                               return;
-                       }
-
-                       
rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), 
targetDir2.getAbsolutePath());
-
-                       try {
-                               Environment env = getMockEnvironment();
-                               AbstractKeyedStateBackend<Integer> 
keyedStateBackend = rocksDbBackend.createKeyedStateBackend(
-                                       env,
-                                       env.getJobID(),
-                                       "foobar",
-                                       IntSerializer.INSTANCE,
-                                       1,
-                                       new KeyGroupRange(0, 0),
-                                       new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
-
-                               IOUtils.closeQuietly(keyedStateBackend);
-                               keyedStateBackend.dispose();
-                       }
-                       catch (Exception e) {
-                               e.printStackTrace();
-                               fail("Backend initialization failed even though 
some paths were available");
-                       }
-               } finally {
-                       //noinspection ResultOfMethodCallIgnored
-                       targetDir1.setWritable(true, false);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  RocksDB Options
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testPredefinedOptions() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
-
-               
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
-               assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
-
-               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
-                       assertEquals(CompactionStyle.LEVEL, 
colCreated.compactionStyle());
-               }
-       }
-
-       @Test
-       public void testOptionsFactory() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               rocksDbBackend.setOptions(new OptionsFactory() {
-                       @Override
-                       public DBOptions createDBOptions(DBOptions 
currentOptions) {
-                               return currentOptions;
-                       }
-
-                       @Override
-                       public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions) {
-                               return 
currentOptions.setCompactionStyle(CompactionStyle.FIFO);
-                       }
-               });
-
-               assertNotNull(rocksDbBackend.getOptions());
-               assertEquals(CompactionStyle.FIFO, 
rocksDbBackend.getColumnOptions().compactionStyle());
-       }
-
-       @Test
-       public void testPredefinedAndOptionsFactory() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
-
-               
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
-               rocksDbBackend.setOptions(new OptionsFactory() {
-                       @Override
-                       public DBOptions createDBOptions(DBOptions 
currentOptions) {
-                               return currentOptions;
-                       }
-
-                       @Override
-                       public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions) {
-                               return 
currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
-                       }
-               });
-
-               assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
-               assertNotNull(rocksDbBackend.getOptions());
-               assertEquals(CompactionStyle.UNIVERSAL, 
rocksDbBackend.getColumnOptions().compactionStyle());
-       }
-
-       @Test
-       public void testPredefinedOptionsEnum() {
-               for (PredefinedOptions o : PredefinedOptions.values()) {
-                       try (DBOptions opt = o.createDBOptions()) {
-                               assertNotNull(opt);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Reconfiguration
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testRocksDbReconfigurationCopiesExistingValues() throws 
Exception {
-               final FsStateBackend checkpointBackend = new 
FsStateBackend(tempFolder.newFolder().toURI().toString());
-               final boolean incremental = 
!CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
-
-               final RocksDBStateBackend original = new 
RocksDBStateBackend(checkpointBackend, incremental);
-
-               // these must not be the default options
-               final PredefinedOptions predOptions = 
PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
-               assertNotEquals(predOptions, original.getPredefinedOptions());
-               original.setPredefinedOptions(predOptions);
-
-               final OptionsFactory optionsFactory = 
mock(OptionsFactory.class);
-               original.setOptions(optionsFactory);
-
-               final String[] localDirs = new String[] {
-                               tempFolder.newFolder().getAbsolutePath(), 
tempFolder.newFolder().getAbsolutePath() };
-               original.setDbStoragePaths(localDirs);
-
-               RocksDBStateBackend copy = original.configure(new 
Configuration());
-
-               assertEquals(original.isIncrementalCheckpointsEnabled(), 
copy.isIncrementalCheckpointsEnabled());
-               assertArrayEquals(original.getDbStoragePaths(), 
copy.getDbStoragePaths());
-               assertEquals(original.getOptions(), copy.getOptions());
-               assertEquals(original.getPredefinedOptions(), 
copy.getPredefinedOptions());
-
-               FsStateBackend copyCheckpointBackend = (FsStateBackend) 
copy.getCheckpointBackend();
-               assertEquals(checkpointBackend.getCheckpointPath(), 
copyCheckpointBackend.getCheckpointPath());
-               assertEquals(checkpointBackend.getSavepointPath(), 
copyCheckpointBackend.getSavepointPath());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Contained Non-partitioned State Backend
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testCallsForwardedToNonPartitionedBackend() throws 
Exception {
-               AbstractStateBackend storageBackend = new MemoryStateBackend();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(storageBackend);
-               assertEquals(storageBackend, 
rocksDbBackend.getCheckpointBackend());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       static Environment getMockEnvironment() {
-               return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
-       }
-
-       static Environment getMockEnvironment(File[] tempDirs) {
-               final String[] tempDirStrings = new String[tempDirs.length];
-               for (int i = 0; i < tempDirs.length; i++) {
-                       tempDirStrings[i] = tempDirs[i].getAbsolutePath();
-               }
-
-               IOManager ioMan = mock(IOManager.class);
-               when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
-
-               Environment env = mock(Environment.class);
-               when(env.getJobID()).thenReturn(new JobID());
-               
when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
-               when(env.getIOManager()).thenReturn(ioMan);
-               when(env.getTaskKvStateRegistry()).thenReturn(new 
KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
-
-               TaskInfo taskInfo = mock(TaskInfo.class);
-               when(env.getTaskInfo()).thenReturn(taskInfo);
-               when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
-
-               TaskManagerRuntimeInfo tmInfo = new 
TestingTaskManagerRuntimeInfo(new Configuration(), tempDirStrings);
-               when(env.getTaskManagerInfo()).thenReturn(tmInfo);
-
-               return env;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
deleted file mode 100644
index 7612c4c..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateBackendLoader;
-import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the RocksDBStateBackendFactory.
- */
-public class RocksDBStateBackendFactoryTest {
-
-       @Rule
-       public final TemporaryFolder tmp = new TemporaryFolder();
-
-       private final ClassLoader cl = getClass().getClassLoader();
-
-       private final String backendKey = 
CheckpointingOptions.STATE_BACKEND.key();
-
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testFactoryName() {
-               // construct the name such that it will not be automatically 
adjusted on refactorings
-               String factoryName = 
"org.apache.flink.contrib.streaming.state.Roc";
-               factoryName += "ksDBStateBackendFactory";
-
-               // !!! if this fails, the code in StateBackendLoader must be 
adjusted
-               assertEquals(factoryName, 
RocksDBStateBackendFactory.class.getName());
-       }
-
-       /**
-        * Validates loading a file system state backend with additional 
parameters from the cluster configuration.
-        */
-       @Test
-       public void testLoadFileSystemStateBackend() throws Exception {
-               final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
-               final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
-               final String localDir1 = tmp.newFolder().getAbsolutePath();
-               final String localDir2 = tmp.newFolder().getAbsolutePath();
-               final String localDirs = localDir1 + File.pathSeparator + 
localDir2;
-               final boolean incremental = 
!CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
-
-               final Path expectedCheckpointsPath = new Path(checkpointDir);
-               final Path expectedSavepointsPath = new Path(savepointDir);
-
-               // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
-               // to guard against config-breaking changes of the name
-               final Configuration config1 = new Configuration();
-               config1.setString(backendKey, "rocksdb");
-               config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
-               config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
-               
config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
-               
config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
-
-               final Configuration config2 = new Configuration();
-               config2.setString(backendKey, 
RocksDBStateBackendFactory.class.getName());
-               config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
-               config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
-               
config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
-               
config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
-
-               StateBackend backend1 = 
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
-               StateBackend backend2 = 
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
-
-               assertTrue(backend1 instanceof RocksDBStateBackend);
-               assertTrue(backend2 instanceof RocksDBStateBackend);
-
-               RocksDBStateBackend fs1 = (RocksDBStateBackend) backend1;
-               RocksDBStateBackend fs2 = (RocksDBStateBackend) backend2;
-
-               AbstractFileStateBackend fs1back = (AbstractFileStateBackend) 
fs1.getCheckpointBackend();
-               AbstractFileStateBackend fs2back = (AbstractFileStateBackend) 
fs2.getCheckpointBackend();
-
-               assertEquals(expectedCheckpointsPath, 
fs1back.getCheckpointPath());
-               assertEquals(expectedCheckpointsPath, 
fs2back.getCheckpointPath());
-               assertEquals(expectedSavepointsPath, 
fs1back.getSavepointPath());
-               assertEquals(expectedSavepointsPath, 
fs2back.getSavepointPath());
-               assertEquals(incremental, 
fs1.isIncrementalCheckpointsEnabled());
-               assertEquals(incremental, 
fs2.isIncrementalCheckpointsEnabled());
-               checkPaths(fs1.getDbStoragePaths(), localDir1, localDir2);
-               checkPaths(fs2.getDbStoragePaths(), localDir1, localDir2);
-       }
-
-       /**
-        * Validates taking the application-defined file system state backend 
and adding with additional
-        * parameters from the cluster configuration, but giving precedence to 
application-defined
-        * parameters over configuration-defined parameters.
-        */
-       @Test
-       public void testLoadFileSystemStateBackendMixed() throws Exception {
-               final String appCheckpointDir = new 
Path(tmp.newFolder().toURI()).toString();
-               final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
-               final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
-
-               final String localDir1 = tmp.newFolder().getAbsolutePath();
-               final String localDir2 = tmp.newFolder().getAbsolutePath();
-               final String localDir3 = tmp.newFolder().getAbsolutePath();
-               final String localDir4 = tmp.newFolder().getAbsolutePath();
-
-               final boolean incremental = 
!CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
-
-               final Path expectedCheckpointsPath = new Path(appCheckpointDir);
-               final Path expectedSavepointsPath = new Path(savepointDir);
-
-               final RocksDBStateBackend backend = new 
RocksDBStateBackend(appCheckpointDir, incremental);
-               backend.setDbStoragePaths(localDir1, localDir2);
-
-               final Configuration config = new Configuration();
-               config.setString(backendKey, "jobmanager"); // this should not 
be picked up
-               config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir); // this should not be picked up
-               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
-               config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
!incremental);  // this should not be picked up
-               
config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + 
":" + localDir4);  // this should not be picked up
-
-               final StateBackend loadedBackend =
-                               
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
-               assertTrue(loadedBackend instanceof RocksDBStateBackend);
-
-               final RocksDBStateBackend loadedRocks = (RocksDBStateBackend) 
loadedBackend;
-
-               assertEquals(incremental, 
loadedRocks.isIncrementalCheckpointsEnabled());
-               checkPaths(loadedRocks.getDbStoragePaths(), localDir1, 
localDir2);
-
-               AbstractFileStateBackend fsBackend = (AbstractFileStateBackend) 
loadedRocks.getCheckpointBackend();
-               assertEquals(expectedCheckpointsPath, 
fsBackend.getCheckpointPath());
-               assertEquals(expectedSavepointsPath, 
fsBackend.getSavepointPath());
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static void checkPaths(String[] pathsArray, String... paths) {
-               assertNotNull(pathsArray);
-               assertNotNull(paths);
-
-               assertEquals(pathsArray.length, paths.length);
-
-               HashSet<String> pathsSet = new 
HashSet<>(Arrays.asList(pathsArray));
-
-               for (String path : paths) {
-                       assertTrue(pathsSet.contains(path));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
deleted file mode 100644
index 54af400..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StateBackendTestBase;
-import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.filefilter.IOFileFilter;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.RocksObject;
-import org.rocksdb.Snapshot;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.RunnableFuture;
-
-import static junit.framework.TestCase.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-import static org.mockito.internal.verification.VerificationModeFactory.times;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.spy;
-
-/**
- * Tests for the partitioned state part of {@link RocksDBStateBackend}.
- */
-@RunWith(Parameterized.class)
-public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBackend> {
-
-       private OneShotLatch blocker;
-       private OneShotLatch waiter;
-       private BlockerCheckpointStreamFactory testStreamFactory;
-       private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
-       private List<RocksObject> allCreatedCloseables;
-       private ValueState<Integer> testState1;
-       private ValueState<String> testState2;
-
-       @Parameterized.Parameters(name = "Incremental checkpointing: {0}")
-       public static Collection<Boolean> parameters() {
-               return Arrays.asList(false, true);
-       }
-
-       @Parameterized.Parameter
-       public boolean enableIncrementalCheckpointing;
-
-       @Rule
-       public final TemporaryFolder tempFolder = new TemporaryFolder();
-
-       // Store it because we need it for the cleanup test.
-       private String dbPath;
-
-       @Override
-       protected RocksDBStateBackend getStateBackend() throws IOException {
-               dbPath = tempFolder.newFolder().getAbsolutePath();
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), enableIncrementalCheckpointing);
-               backend.setDbStoragePath(dbPath);
-               return backend;
-       }
-
-       // small safety net for instance cleanups, so that no native objects 
are left
-       @After
-       public void cleanupRocksDB() {
-               if (keyedStateBackend != null) {
-                       IOUtils.closeQuietly(keyedStateBackend);
-                       keyedStateBackend.dispose();
-               }
-
-               if (allCreatedCloseables != null) {
-                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
-                               verify(rocksCloseable, times(1)).close();
-                       }
-                       allCreatedCloseables = null;
-               }
-       }
-
-       public void setupRocksKeyedStateBackend() throws Exception {
-
-               blocker = new OneShotLatch();
-               waiter = new OneShotLatch();
-               testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 
1024);
-               testStreamFactory.setBlockerLatch(blocker);
-               testStreamFactory.setWaiterLatch(waiter);
-               testStreamFactory.setAfterNumberInvocations(10);
-
-               RocksDBStateBackend backend = getStateBackend();
-               Environment env = new DummyEnvironment("TestTask", 1, 0);
-
-               keyedStateBackend = (RocksDBKeyedStateBackend<Integer>) 
backend.createKeyedStateBackend(
-                               env,
-                               new JobID(),
-                               "Test",
-                               IntSerializer.INSTANCE,
-                               2,
-                               new KeyGroupRange(0, 1),
-                               mock(TaskKvStateRegistry.class));
-
-               keyedStateBackend.restore(null);
-
-               testState1 = keyedStateBackend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               new ValueStateDescriptor<>("TestState-1", 
Integer.class, 0));
-
-               testState2 = keyedStateBackend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               new ValueStateDescriptor<>("TestState-2", 
String.class, ""));
-
-               allCreatedCloseables = new ArrayList<>();
-
-               keyedStateBackend.db = spy(keyedStateBackend.db);
-
-               doAnswer(new Answer<Object>() {
-
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               RocksIterator rocksIterator = 
spy((RocksIterator) invocationOnMock.callRealMethod());
-                               allCreatedCloseables.add(rocksIterator);
-                               return rocksIterator;
-                       }
-               
}).when(keyedStateBackend.db).newIterator(any(ColumnFamilyHandle.class), 
any(ReadOptions.class));
-
-               doAnswer(new Answer<Object>() {
-
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               Snapshot snapshot = spy((Snapshot) 
invocationOnMock.callRealMethod());
-                               allCreatedCloseables.add(snapshot);
-                               return snapshot;
-                       }
-               }).when(keyedStateBackend.db).getSnapshot();
-
-               doAnswer(new Answer<Object>() {
-
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               ColumnFamilyHandle snapshot = 
spy((ColumnFamilyHandle) invocationOnMock.callRealMethod());
-                               allCreatedCloseables.add(snapshot);
-                               return snapshot;
-                       }
-               
}).when(keyedStateBackend.db).createColumnFamily(any(ColumnFamilyDescriptor.class));
-
-               for (int i = 0; i < 100; ++i) {
-                       keyedStateBackend.setCurrentKey(i);
-                       testState1.update(4200 + i);
-                       testState2.update("S-" + (4200 + i));
-               }
-       }
-
-       @Test
-       public void testCorrectMergeOperatorSet() throws IOException {
-
-               final ColumnFamilyOptions columnFamilyOptions = spy(new 
ColumnFamilyOptions());
-               RocksDBKeyedStateBackend<Integer> test = null;
-               try {
-                       test = new RocksDBKeyedStateBackend<>(
-                               "test",
-                               Thread.currentThread().getContextClassLoader(),
-                               tempFolder.newFolder(),
-                               mock(DBOptions.class),
-                               columnFamilyOptions,
-                               mock(TaskKvStateRegistry.class),
-                               IntSerializer.INSTANCE,
-                               1,
-                               new KeyGroupRange(0, 0),
-                               new ExecutionConfig(),
-                               enableIncrementalCheckpointing);
-
-                       verify(columnFamilyOptions, Mockito.times(1))
-                               
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
-               } finally {
-                       if (test != null) {
-                               IOUtils.closeQuietly(test);
-                               test.dispose();
-                       }
-                       columnFamilyOptions.close();
-               }
-       }
-
-       @Test
-       public void testReleasingSnapshotAfterBackendClosed() throws Exception {
-               setupRocksKeyedStateBackend();
-
-               try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
-                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-                       RocksDB spyDB = keyedStateBackend.db;
-
-                       if (!enableIncrementalCheckpointing) {
-                               verify(spyDB, times(1)).getSnapshot();
-                               verify(spyDB, 
times(0)).releaseSnapshot(any(Snapshot.class));
-                       }
-
-                       //Ensure every RocksObjects not closed yet
-                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
-                               verify(rocksCloseable, times(0)).close();
-                       }
-
-                       snapshot.cancel(true);
-
-                       this.keyedStateBackend.dispose();
-
-                       verify(spyDB, times(1)).close();
-                       assertEquals(null, keyedStateBackend.db);
-
-                       //Ensure every RocksObjects was closed exactly once
-                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
-                               verify(rocksCloseable, times(1)).close();
-                       }
-
-               } finally {
-                       keyedStateBackend.dispose();
-                       keyedStateBackend = null;
-               }
-       }
-
-       @Test
-       public void testDismissingSnapshot() throws Exception {
-               setupRocksKeyedStateBackend();
-               try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
-                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-                       snapshot.cancel(true);
-                       verifyRocksObjectsReleased();
-               } finally {
-                       this.keyedStateBackend.dispose();
-                       this.keyedStateBackend = null;
-               }
-       }
-
-       @Test
-       public void testDismissingSnapshotNotRunnable() throws Exception {
-               setupRocksKeyedStateBackend();
-               try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
-                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-                       snapshot.cancel(true);
-                       Thread asyncSnapshotThread = new Thread(snapshot);
-                       asyncSnapshotThread.start();
-                       try {
-                               snapshot.get();
-                               fail();
-                       } catch (Exception ignored) {
-
-                       }
-                       asyncSnapshotThread.join();
-                       verifyRocksObjectsReleased();
-               } finally {
-                       this.keyedStateBackend.dispose();
-                       this.keyedStateBackend = null;
-               }
-       }
-
-       @Test
-       public void testCompletingSnapshot() throws Exception {
-               setupRocksKeyedStateBackend();
-               try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
-                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-                       Thread asyncSnapshotThread = new Thread(snapshot);
-                       asyncSnapshotThread.start();
-                       waiter.await(); // wait for snapshot to run
-                       waiter.reset();
-                       runStateUpdates();
-                       blocker.trigger(); // allow checkpointing to start 
writing
-                       waiter.await(); // wait for snapshot stream writing to 
run
-                       KeyedStateHandle keyedStateHandle = snapshot.get();
-                       assertNotNull(keyedStateHandle);
-                       assertTrue(keyedStateHandle.getStateSize() > 0);
-                       assertEquals(2, 
keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-                       
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
-                       asyncSnapshotThread.join();
-                       verifyRocksObjectsReleased();
-               } finally {
-                       this.keyedStateBackend.dispose();
-                       this.keyedStateBackend = null;
-               }
-       }
-
-       @Test
-       public void testCancelRunningSnapshot() throws Exception {
-               setupRocksKeyedStateBackend();
-               try {
-                       RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                       Thread asyncSnapshotThread = new Thread(snapshot);
-                       asyncSnapshotThread.start();
-                       waiter.await(); // wait for snapshot to run
-                       waiter.reset();
-                       runStateUpdates();
-                       snapshot.cancel(true);
-                       blocker.trigger(); // allow checkpointing to start 
writing
-                       
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
-                       waiter.await(); // wait for snapshot stream writing to 
run
-                       try {
-                               snapshot.get();
-                               fail();
-                       } catch (Exception ignored) {
-                       }
-
-                       asyncSnapshotThread.join();
-                       verifyRocksObjectsReleased();
-               } finally {
-                       this.keyedStateBackend.dispose();
-                       this.keyedStateBackend = null;
-               }
-       }
-
-       @Test
-       public void testDisposeDeletesAllDirectories() throws Exception {
-               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
-               Collection<File> allFilesInDbDir =
-                       FileUtils.listFilesAndDirs(new File(dbPath), new 
AcceptAllFilter(), new AcceptAllFilter());
-               try {
-                       ValueStateDescriptor<String> kvId =
-                               new ValueStateDescriptor<>("id", String.class, 
null);
-
-                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
-
-                       ValueState<String> state =
-                               
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-
-                       backend.setCurrentKey(1);
-                       state.update("Hello");
-
-                       // more than just the root directory
-                       assertTrue(allFilesInDbDir.size() > 1);
-               } finally {
-                       IOUtils.closeQuietly(backend);
-                       backend.dispose();
-               }
-               allFilesInDbDir =
-                       FileUtils.listFilesAndDirs(new File(dbPath), new 
AcceptAllFilter(), new AcceptAllFilter());
-
-               // just the root directory left
-               assertEquals(1, allFilesInDbDir.size());
-       }
-
-       @Test
-       public void testSharedIncrementalStateDeRegistration() throws Exception 
{
-               if (enableIncrementalCheckpointing) {
-                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
-                       try {
-                               ValueStateDescriptor<String> kvId =
-                                       new ValueStateDescriptor<>("id", 
String.class, null);
-
-                               kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
-
-                               ValueState<String> state =
-                                       
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
-
-                               Queue<IncrementalKeyedStateHandle> 
previousStateHandles = new LinkedList<>();
-                               SharedStateRegistry sharedStateRegistry = 
spy(new SharedStateRegistry());
-                               for (int checkpointId = 0; checkpointId < 3; 
++checkpointId) {
-
-                                       reset(sharedStateRegistry);
-
-                                       backend.setCurrentKey(checkpointId);
-                                       state.update("Hello-" + checkpointId);
-
-                                       RunnableFuture<KeyedStateHandle> 
snapshot = backend.snapshot(
-                                               checkpointId,
-                                               checkpointId,
-                                               createStreamFactory(),
-                                               
CheckpointOptions.forCheckpointWithDefaultLocation());
-
-                                       snapshot.run();
-
-                                       IncrementalKeyedStateHandle stateHandle 
= (IncrementalKeyedStateHandle) snapshot.get();
-                                       Map<StateHandleID, StreamStateHandle> 
sharedState =
-                                               new 
HashMap<>(stateHandle.getSharedState());
-
-                                       
stateHandle.registerSharedStates(sharedStateRegistry);
-
-                                       for (Map.Entry<StateHandleID, 
StreamStateHandle> e : sharedState.entrySet()) {
-                                               
verify(sharedStateRegistry).registerReference(
-                                                       
stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
-                                                       e.getValue());
-                                       }
-
-                                       previousStateHandles.add(stateHandle);
-                                       
backend.notifyCheckpointComplete(checkpointId);
-
-                                       
//-----------------------------------------------------------------
-
-                                       if (previousStateHandles.size() > 1) {
-                                               
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
-                                       }
-                               }
-
-                               while (!previousStateHandles.isEmpty()) {
-
-                                       reset(sharedStateRegistry);
-
-                                       
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
-                               }
-                       } finally {
-                               IOUtils.closeQuietly(backend);
-                               backend.dispose();
-                       }
-               }
-       }
-
-       private void checkRemove(IncrementalKeyedStateHandle remove, 
SharedStateRegistry registry) throws Exception {
-               for (StateHandleID id : remove.getSharedState().keySet()) {
-                       verify(registry, times(0)).unregisterReference(
-                               
remove.createSharedStateRegistryKeyFromFileName(id));
-               }
-
-               remove.discardState();
-
-               for (StateHandleID id : remove.getSharedState().keySet()) {
-                       verify(registry).unregisterReference(
-                               
remove.createSharedStateRegistryKeyFromFileName(id));
-               }
-       }
-
-       private void runStateUpdates() throws Exception{
-               for (int i = 50; i < 150; ++i) {
-                       if (i % 10 == 0) {
-                               Thread.sleep(1);
-                       }
-                       keyedStateBackend.setCurrentKey(i);
-                       testState1.update(4200 + i);
-                       testState2.update("S-" + (4200 + i));
-               }
-       }
-
-       private void verifyRocksObjectsReleased() {
-               //Ensure every RocksObject was closed exactly once
-               for (RocksObject rocksCloseable : allCreatedCloseables) {
-                       verify(rocksCloseable, times(1)).close();
-               }
-
-               assertNotNull(null, keyedStateBackend.db);
-               RocksDB spyDB = keyedStateBackend.db;
-
-               if (!enableIncrementalCheckpointing) {
-                       verify(spyDB, times(1)).getSnapshot();
-                       verify(spyDB, 
times(1)).releaseSnapshot(any(Snapshot.class));
-               }
-
-               keyedStateBackend.dispose();
-               verify(spyDB, times(1)).close();
-               assertEquals(null, keyedStateBackend.db);
-       }
-
-       private static class AcceptAllFilter implements IOFileFilter {
-               @Override
-               public boolean accept(File file) {
-                       return true;
-               }
-
-               @Override
-               public boolean accept(File file, String s) {
-                       return true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
deleted file mode 100644
index 72c85ec..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.RocksDB;
-
-import java.lang.reflect.Method;
-import java.net.URL;
-
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * This test validates that the RocksDB JNI library loading works properly
- * in the presence of the RocksDB code being loaded dynamically via reflection.
- * That can happen when RocksDB is in the user code JAR, or in certain test 
setups.
- */
-public class RocksDbMultiClassLoaderTest {
-
-       @Rule
-       public final TemporaryFolder tmp = new TemporaryFolder();
-
-       @Test
-       public void testTwoSeparateClassLoaders() throws Exception {
-               // collect the libraries / class folders with RocksDB related 
code: the state backend and RocksDB itself
-               final URL codePath1 = 
RocksDBStateBackend.class.getProtectionDomain().getCodeSource().getLocation();
-               final URL codePath2 = 
RocksDB.class.getProtectionDomain().getCodeSource().getLocation();
-
-               final ClassLoader parent = getClass().getClassLoader();
-               final ClassLoader loader1 = 
FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, 
parent, new String[0]);
-               final ClassLoader loader2 = 
FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, 
parent, new String[0]);
-
-               final String className = RocksDBStateBackend.class.getName();
-
-               final Class<?> clazz1 = Class.forName(className, false, 
loader1);
-               final Class<?> clazz2 = Class.forName(className, false, 
loader2);
-               assertNotEquals("Test broken - the two reflectively loaded 
classes are equal", clazz1, clazz2);
-
-               final Object instance1 = 
clazz1.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString());
-               final Object instance2 = 
clazz2.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString());
-
-               final String tempDir = tmp.newFolder().getAbsolutePath();
-
-               final Method meth1 = 
clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
-               final Method meth2 = 
clazz2.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
-               meth1.setAccessible(true);
-               meth2.setAccessible(true);
-
-               // if all is well, these methods can both complete successfully
-               meth1.invoke(instance1, tempDir);
-               meth2.invoke(instance2, tempDir);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
deleted file mode 100644
index 670c355..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.testutils.junit.RetryOnFailure;
-import org.apache.flink.testutils.junit.RetryRule;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.NativeLibraryLoader;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Test that validates that the performance of APIs of RocksDB's ListState is 
as expected.
- *
- * <p>Benchmarking:
- *
- * <p>Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel 
Core i7, Memory 16GB 1600MHz DDR3
- * Number of values added | time for add()   |  time for update() | perf 
improvement of update() over add()
- * 10                                          236875 ns                       
17048 ns                        13.90x
- * 50                                          312332 ns                       
14281 ns                        21.87x
- * 100                                         393791 ns                       
18360 ns                        21.45x
- * 500                                         978703 ns                       
55397 ns                        17.66x
- * 1000                                                3044179 ns              
        89474 ns                        34.02x
- * 5000                                                9247395 ns              
        305580 ns                       30.26x
- * 10000                                       16416442 ns                     
605963 ns                       27.09x
- * 50000                                       84311205 ns                     
5691288 ns                      14.81x
- * 100000                                      195103310 ns            
12914182 ns                     15.11x
- * 500000                                      1223141510 ns           
70595881 ns                     17.33x
- *
- * <p>In summary, update() API which pre-merges all values gives users 15-35x 
performance improvements.
- * For most frequent use cases where there are a few hundreds to a few 
thousands values per key,
- * users can get 30x - 35x performance improvement!
- *
- */
-public class RocksDBListStatePerformanceTest extends TestLogger {
-
-       private static final byte DELIMITER = ',';
-
-       @Rule
-       public final TemporaryFolder tmp = new TemporaryFolder();
-
-       @Rule
-       public final RetryRule retry = new RetryRule();
-
-       @Test(timeout = 2000)
-       @RetryOnFailure(times = 3)
-       public void testRocksDbListStateAPIs() throws Exception {
-               final File rocksDir = tmp.newFolder();
-
-               // ensure the RocksDB library is loaded to a distinct location 
each retry
-               
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
-
-               final String key1 = "key1";
-               final String key2 = "key2";
-               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-
-               final byte[] keyBytes1 = key1.getBytes(StandardCharsets.UTF_8);
-               final byte[] keyBytes2 = key2.getBytes(StandardCharsets.UTF_8);
-               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
-
-               // The number of values added to ListState. Can be changed for 
benchmarking
-               final int num = 10;
-
-               try (
-                       final Options options = new Options()
-                                       
.setCompactionStyle(CompactionStyle.LEVEL)
-                                       
.setLevelCompactionDynamicLevelBytes(true)
-                                       .setIncreaseParallelism(4)
-                                       .setUseFsync(false)
-                                       .setMaxOpenFiles(-1)
-                                       .setCreateIfMissing(true)
-                                       
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
-
-                       final WriteOptions writeOptions = new WriteOptions()
-                                       .setSync(false)
-                                       .setDisableWAL(true);
-
-                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
-
-                       // ----- add() API -----
-                       log.info("begin add");
-
-                       final long beginInsert1 = System.nanoTime();
-                       for (int i = 0; i < num; i++) {
-                               rocksDB.merge(writeOptions, keyBytes1, 
valueBytes);
-                       }
-                       final long endInsert1 = System.nanoTime();
-
-                       log.info("end add - duration: {} ns", (endInsert1 - 
beginInsert1));
-
-                       // ----- update() API -----
-
-                       List<byte[]> list = new ArrayList<>(num);
-                       for (int i = 0; i < num; i++) {
-                               list.add(valueBytes);
-                       }
-                       byte[] premerged = merge(list);
-
-                       log.info("begin update");
-
-                       final long beginInsert2 = System.nanoTime();
-                       rocksDB.merge(writeOptions, keyBytes2, premerged);
-                       final long endInsert2 = System.nanoTime();
-
-                       log.info("end update - duration: {} ns", (endInsert2 - 
beginInsert2));
-               }
-       }
-
-       /**
-        * Merge operands into a single value that can be put directly into 
RocksDB.
-        */
-       public static byte[] merge(List<byte[]> operands) {
-               if (operands == null || operands.size() == 0) {
-                       return null;
-               }
-
-               if (operands.size() == 1) {
-                       return operands.get(0);
-               }
-
-               int numBytes = 0;
-               for (byte[] arr : operands) {
-                       numBytes += arr.length + 1;
-               }
-               numBytes--;
-
-               byte[] result = new byte[numBytes];
-
-               System.arraycopy(operands.get(0), 0, result, 0, 
operands.get(0).length);
-
-               for (int i = 1, arrIndex = operands.get(0).length; i < 
operands.size(); i++) {
-                       result[arrIndex] = DELIMITER;
-                       arrIndex += 1;
-                       System.arraycopy(operands.get(i), 0, result, arrIndex, 
operands.get(i).length);
-                       arrIndex += operands.get(i).length;
-               }
-
-               return result;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
deleted file mode 100644
index e05e7ae..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.testutils.junit.RetryOnFailure;
-import org.apache.flink.testutils.junit.RetryRule;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.NativeLibraryLoader;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-import sun.misc.Unsafe;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-
-/**
- * Test that validates that the performance of RocksDB is as expected.
- * This test guards against the bug filed as 'FLINK-5756'
- */
-public class RocksDBPerformanceTest extends TestLogger {
-
-       @Rule
-       public final TemporaryFolder tmp = new TemporaryFolder();
-
-       @Rule
-       public final RetryRule retry = new RetryRule();
-
-       private File rocksDir;
-       private Options options;
-       private WriteOptions writeOptions;
-
-       private final String key = "key";
-       private final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-
-       private final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-       private final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
-
-       @Before
-       public void init() throws IOException {
-               rocksDir = tmp.newFolder();
-
-               // ensure the RocksDB library is loaded to a distinct location 
each retry
-               
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
-
-               options = new Options()
-                               .setCompactionStyle(CompactionStyle.LEVEL)
-                               .setLevelCompactionDynamicLevelBytes(true)
-                               .setIncreaseParallelism(4)
-                               .setUseFsync(false)
-                               .setMaxOpenFiles(-1)
-                               .setCreateIfMissing(true)
-                               
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
-
-               writeOptions = new WriteOptions()
-                               .setSync(false)
-                               .setDisableWAL(true);
-       }
-
-       @Test(timeout = 2000)
-       @RetryOnFailure(times = 3)
-       public void testRocksDbMergePerformance() throws Exception {
-               final int num = 50000;
-
-               try (RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
-
-                       // ----- insert -----
-                       log.info("begin insert");
-
-                       final long beginInsert = System.nanoTime();
-                       for (int i = 0; i < num; i++) {
-                               rocksDB.merge(writeOptions, keyBytes, 
valueBytes);
-                       }
-                       final long endInsert = System.nanoTime();
-                       log.info("end insert - duration: {} ms", (endInsert - 
beginInsert) / 1_000_000);
-
-                       // ----- read (attempt 1) -----
-
-                       final byte[] resultHolder = new byte[num * 
(valueBytes.length + 2)];
-                       final long beginGet1 = System.nanoTime();
-                       rocksDB.get(keyBytes, resultHolder);
-                       final long endGet1 = System.nanoTime();
-
-                       log.info("end get - duration: {} ms", (endGet1 - 
beginGet1) / 1_000_000);
-
-                       // ----- read (attempt 2) -----
-
-                       final long beginGet2 = System.nanoTime();
-                       rocksDB.get(keyBytes, resultHolder);
-                       final long endGet2 = System.nanoTime();
-
-                       log.info("end get - duration: {} ms", (endGet2 - 
beginGet2) / 1_000_000);
-
-                       // ----- compact -----
-                       log.info("compacting...");
-                       final long beginCompact = System.nanoTime();
-                       rocksDB.compactRange();
-                       final long endCompact = System.nanoTime();
-
-                       log.info("end compaction - duration: {} ms", 
(endCompact - beginCompact) / 1_000_000);
-
-                       // ----- read (attempt 3) -----
-
-                       final long beginGet3 = System.nanoTime();
-                       rocksDB.get(keyBytes, resultHolder);
-                       final long endGet3 = System.nanoTime();
-
-                       log.info("end get - duration: {} ms", (endGet3 - 
beginGet3) / 1_000_000);
-               }
-       }
-
-       @Test(timeout = 2000)
-       @RetryOnFailure(times = 3)
-       public void testRocksDbRangeGetPerformance() throws Exception {
-               final int num = 50000;
-
-               try (RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
-
-                       final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
-
-                       final Unsafe unsafe = MemoryUtils.UNSAFE;
-                       final long offset = 
unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
-
-                       log.info("begin insert");
-
-                       final long beginInsert = System.nanoTime();
-                       for (int i = 0; i < num; i++) {
-                               unsafe.putInt(keyTemplate, offset, i);
-                               rocksDB.put(writeOptions, keyTemplate, 
valueBytes);
-                       }
-                       final long endInsert = System.nanoTime();
-                       log.info("end insert - duration: {} ms", (endInsert - 
beginInsert) / 1_000_000);
-
-                       @SuppressWarnings("MismatchedReadAndWriteOfArray")
-                       final byte[] resultHolder = new byte[num * 
valueBytes.length];
-
-                       final long beginGet = System.nanoTime();
-
-                       int pos = 0;
-
-                       try (final RocksIterator iterator = 
rocksDB.newIterator()) {
-                               // seek to start
-                               unsafe.putInt(keyTemplate, offset, 0);
-                               iterator.seek(keyTemplate);
-
-                               // iterate
-                               while (iterator.isValid() && 
samePrefix(keyBytes, iterator.key())) {
-                                       byte[] currValue = iterator.value();
-                                       System.arraycopy(currValue, 0, 
resultHolder, pos, currValue.length);
-                                       pos += currValue.length;
-                                       iterator.next();
-                               }
-                       }
-
-                       final long endGet = System.nanoTime();
-
-                       log.info("end get - duration: {} ms", (endGet - 
beginGet) / 1_000_000);
-               }
-       }
-
-       private static boolean samePrefix(byte[] prefix, byte[] key) {
-               for (int i = 0; i < prefix.length; i++) {
-                       if (prefix[i] != key [i]) {
-                               return false;
-                       }
-               }
-
-               return true;
-       }
-
-       @After
-       public void close() {
-               options.close();
-               writeOptions.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
deleted file mode 100644
index 881dc06..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index 30cadfa..644f299 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -40,7 +40,6 @@ under the License.
                <module>flink-storm</module>
                <module>flink-storm-examples</module>
                <module>flink-connector-wikiedits</module>
-               <module>flink-statebackend-rocksdb</module>
        </modules>
 
        <dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml 
b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
new file mode 100644
index 0000000..16416c6
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-state-backends</artifactId>
+               <version>1.5-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+       <name>flink-statebackend-rocksdb</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-clients_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.rocksdb</groupId>
+                       <artifactId>rocksdbjni</artifactId>
+                       <version>5.6.1</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
new file mode 100644
index 0000000..969a1fc
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+
+/**
+ * Base class for {@link State} implementations that store state in a RocksDB 
database.
+ *
+ * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} 
instance that
+ * the {@link RocksDBStateBackend} manages and checkpoints.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <S> The type of {@link State}.
+ * @param <SD> The type of {@link StateDescriptor}.
+ */
+public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S, V>, V>
+               implements InternalKvState<N>, State {
+
+       /** Serializer for the namespace. */
+       final TypeSerializer<N> namespaceSerializer;
+
+       /** The current namespace, which the next value methods will refer to. 
*/
+       private N currentNamespace;
+
+       /** Backend that holds the actual RocksDB instance where we store 
state. */
+       protected RocksDBKeyedStateBackend<K> backend;
+
+       /** The column family of this particular instance of state. */
+       protected ColumnFamilyHandle columnFamily;
+
+       /** State descriptor from which to create this state instance. */
+       protected final SD stateDesc;
+
+       /**
+        * We disable writes to the write-ahead-log here.
+        */
+       private final WriteOptions writeOptions;
+
+       protected final ByteArrayOutputStreamWithPos keySerializationStream;
+       protected final DataOutputView keySerializationDataOutputView;
+
+       private final boolean ambiguousKeyPossible;
+
+       /**
+        * Creates a new RocksDB backed state.
+        *  @param namespaceSerializer The serializer for the namespace.
+        */
+       protected AbstractRocksDBState(
+                       ColumnFamilyHandle columnFamily,
+                       TypeSerializer<N> namespaceSerializer,
+                       SD stateDesc,
+                       RocksDBKeyedStateBackend<K> backend) {
+
+               this.namespaceSerializer = namespaceSerializer;
+               this.backend = backend;
+
+               this.columnFamily = columnFamily;
+
+               writeOptions = new WriteOptions();
+               writeOptions.setDisableWAL(true);
+               this.stateDesc = Preconditions.checkNotNull(stateDesc, "State 
Descriptor");
+
+               this.keySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
+               this.keySerializationDataOutputView = new 
DataOutputViewStreamWrapper(keySerializationStream);
+               this.ambiguousKeyPossible = 
(backend.getKeySerializer().getLength() < 0)
+                               && (namespaceSerializer.getLength() < 0);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void clear() {
+               try {
+                       writeCurrentKeyWithGroupAndNamespace();
+                       byte[] key = keySerializationStream.toByteArray();
+                       backend.db.remove(columnFamily, writeOptions, key);
+               } catch (IOException | RocksDBException e) {
+                       throw new RuntimeException("Error while removing entry 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void setCurrentNamespace(N namespace) {
+               this.currentNamespace = Preconditions.checkNotNull(namespace, 
"Namespace");
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
+               Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
+
+               //TODO make KvStateSerializer key-group aware to save this 
round trip and key-group computation
+               Tuple2<K, N> des = KvStateSerializer.<K, 
N>deserializeKeyAndNamespace(
+                               serializedKeyAndNamespace,
+                               backend.getKeySerializer(),
+                               namespaceSerializer);
+
+               int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
+
+               // we cannot reuse the keySerializationStream member since this 
method
+               // is called concurrently to the other ones and it may thus 
contain garbage
+               ByteArrayOutputStreamWithPos tmpKeySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
+               DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView = new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+               writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+                       tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
+
+               return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
+       }
+
+       protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
+               writeKeyWithGroupAndNamespace(
+                       backend.getCurrentKeyGroupIndex(),
+                       backend.getCurrentKey(),
+                       currentNamespace,
+                       keySerializationStream,
+                       keySerializationDataOutputView);
+       }
+
+       protected void writeKeyWithGroupAndNamespace(
+                       int keyGroup, K key, N namespace,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDataOutputView) throws 
IOException {
+
+               Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
+
+               keySerializationStream.reset();
+               writeKeyGroup(keyGroup, keySerializationDataOutputView);
+               writeKey(key, keySerializationStream, 
keySerializationDataOutputView);
+               writeNameSpace(namespace, keySerializationStream, 
keySerializationDataOutputView);
+       }
+
+       private void writeKeyGroup(
+                       int keyGroup,
+                       DataOutputView keySerializationDateDataOutputView) 
throws IOException {
+               for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
+                       keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
+               }
+       }
+
+       private void writeKey(
+                       K key,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDataOutputView) throws 
IOException {
+               //write key
+               int beforeWrite = keySerializationStream.getPosition();
+               backend.getKeySerializer().serialize(key, 
keySerializationDataOutputView);
+
+               if (ambiguousKeyPossible) {
+                       //write size of key
+                       writeLengthFrom(beforeWrite, keySerializationStream,
+                               keySerializationDataOutputView);
+               }
+       }
+
+       private void writeNameSpace(
+                       N namespace,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDataOutputView) throws 
IOException {
+               int beforeWrite = keySerializationStream.getPosition();
+               namespaceSerializer.serialize(namespace, 
keySerializationDataOutputView);
+
+               if (ambiguousKeyPossible) {
+                       //write length of namespace
+                       writeLengthFrom(beforeWrite, keySerializationStream,
+                               keySerializationDataOutputView);
+               }
+       }
+
+       private static void writeLengthFrom(
+                       int fromPosition,
+                       ByteArrayOutputStreamWithPos keySerializationStream,
+                       DataOutputView keySerializationDateDataOutputView) 
throws IOException {
+               int length = keySerializationStream.getPosition() - 
fromPosition;
+               writeVariableIntBytes(length, 
keySerializationDateDataOutputView);
+       }
+
+       private static void writeVariableIntBytes(
+                       int value,
+                       DataOutputView keySerializationDateDataOutputView)
+                       throws IOException {
+               do {
+                       keySerializationDateDataOutputView.writeByte(value);
+                       value >>>= 8;
+               } while (value != 0);
+       }
+
+       protected Tuple3<Integer, K, N> 
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+               int keyGroup = readKeyGroup(inputView);
+               K key = readKey(inputStream, inputView);
+               N namespace = readNamespace(inputStream, inputView);
+
+               return new Tuple3<>(keyGroup, key, namespace);
+       }
+
+       private int readKeyGroup(DataInputView inputView) throws IOException {
+               int keyGroup = 0;
+               for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) {
+                       keyGroup <<= 8;
+                       keyGroup |= (inputView.readByte() & 0xFF);
+               }
+               return keyGroup;
+       }
+
+       private K readKey(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+               int beforeRead = inputStream.getPosition();
+               K key = backend.getKeySerializer().deserialize(inputView);
+               if (ambiguousKeyPossible) {
+                       int length = inputStream.getPosition() - beforeRead;
+                       readVariableIntBytes(inputView, length);
+               }
+               return key;
+       }
+
+       private N readNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+               int beforeRead = inputStream.getPosition();
+               N namespace = namespaceSerializer.deserialize(inputView);
+               if (ambiguousKeyPossible) {
+                       int length = inputStream.getPosition() - beforeRead;
+                       readVariableIntBytes(inputView, length);
+               }
+               return namespace;
+       }
+
+       private void readVariableIntBytes(DataInputView inputView, int value) 
throws IOException {
+               do {
+                       inputView.readByte();
+                       value >>>= 8;
+               } while (value != 0);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
new file mode 100644
index 0000000..34f7f62
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+/**
+ * A factory for {@link DBOptions} to be passed to the {@link 
RocksDBStateBackend}.
+ * Options have to be created lazily by this factory, because the {@code 
Options}
+ * class is not serializable and holds pointers to native code.
+ *
+ * <p>A typical pattern to use this OptionsFactory is as follows:
+ *
+ * <h3>Java 8:</h3>
+ * <pre>{@code
+ * rocksDbBackend.setOptions( (currentOptions) -> 
currentOptions.setMaxOpenFiles(1024) );
+ * }</pre>
+ *
+ * <h3>Java 7:</h3>
+ * <pre>{@code
+ * rocksDbBackend.setOptions(new OptionsFactory() {
+ *
+ *     public Options setOptions(Options currentOptions) {
+ *         return currentOptions.setMaxOpenFiles(1024);
+ *     }
+ * })
+ * }</pre>
+ */
+public interface OptionsFactory extends java.io.Serializable {
+
+       /**
+        * This method should set the additional options on top of the current 
options object.
+        * The current options object may contain pre-defined options based on 
flags that have
+        * been configured on the state backend.
+        *
+        * <p>It is important to set the options on the current object and 
return the result from
+        * the setter methods, otherwise the pre-defined options may get lost.
+        *
+        * @param currentOptions The options object with the pre-defined 
options.
+        * @return The options object on which the additional options are set.
+        */
+       DBOptions createDBOptions(DBOptions currentOptions);
+
+       /**
+        * This method should set the additional options on top of the current 
options object.
+        * The current options object may contain pre-defined options based on 
flags that have
+        * been configured on the state backend.
+        *
+        * <p>It is important to set the options on the current object and 
return the result from
+        * the setter methods, otherwise the pre-defined options may get lost.
+        *
+        * @param currentOptions The options object with the pre-defined 
options.
+        * @return The options object on which the additional options are set.
+        */
+       ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions);
+
+}

Reply via email to