Repository: flink
Updated Branches:
  refs/heads/master dcea46e89 -> edae79340


[FLINK-3358] [FLINK-3351] [rocksdb] Add simple constructors and automatic temp 
path configuration

This adds constructors that only take a backup dir URI and use it to initialize
both the RocksDB file backups and the FileSystem state backend for 
non-partitioned
state.

Also, the RocksDBStateBackend now automatically picks up the TaskManager's temp 
directories,
if no local storage directories are explicitly configured.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edae7934
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edae7934
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edae7934

Branch: refs/heads/master
Commit: edae79340dd486915d25109cbdc1485accae665a
Parents: be72758
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 11 21:30:36 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 11 21:34:03 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 241 ++++++++++++++--
 .../state/RocksDBStateBackendConfigTest.java    | 280 +++++++++++++++++++
 .../state/RocksDBStateBackendTest.java          |   4 +-
 .../state/filesystem/FsStateBackend.java        |  99 ++++---
 .../EventTimeWindowCheckpointingITCase.java     |  13 +-
 5 files changed, 566 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index eddd8c0..5b16e86 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -18,7 +18,12 @@
 package org.apache.flink.contrib.streaming.state;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
@@ -28,13 +33,17 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.api.common.state.StateBackend;
 
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.rocksdb.Options;
 import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -54,20 +63,35 @@ import static java.util.Objects.requireNonNull;
 public class RocksDBStateBackend extends AbstractStateBackend {
        private static final long serialVersionUID = 1L;
 
-       /** Base path for RocksDB directory. */
-       private final String dbBasePath;
-
-       /** The checkpoint directory that we snapshot RocksDB backups to. */
-       private final String checkpointDirectory;
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackend.class);
+       
+       
+       /** The checkpoint directory that copy the RocksDB backups to. */
+       private final Path checkpointDirectory;
 
+       /** The state backend that stores the non-partitioned state */
+       private final AbstractStateBackend nonPartitionedStateBackend;
+       
+       
        /** Operator identifier that is used to uniqueify the RocksDB storage 
path. */
        private String operatorIdentifier;
 
        /** JobID for uniquifying backup paths. */
        private JobID jobId;
+       
 
-       private AbstractStateBackend backingStateBackend;
+       // DB storage directories
+       
+       /** Base paths for RocksDB directory, as configured. May be null. */
+       private Path[] dbBasePaths;
 
+       /** Base paths for RocksDB directory, as initialized */
+       private File[] dbStorageDirectories;
+       
+       private int nextDirectory;
+       
+       // RocksDB options
+       
        /** The pre-configured option settings */
        private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
        
@@ -79,31 +103,112 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        
        // 
------------------------------------------------------------------------
 
-       public RocksDBStateBackend(String dbBasePath, String 
checkpointDirectory, AbstractStateBackend backingStateBackend) {
-               this.dbBasePath = requireNonNull(dbBasePath);
-               this.checkpointDirectory = requireNonNull(checkpointDirectory);
-               this.backingStateBackend = requireNonNull(backingStateBackend);
+       /**
+        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
+        * file system and location defined by the given URI.
+        * 
+        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
+        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
+        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
+        * config, or included in the classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public RocksDBStateBackend(String checkpointDataUri) throws IOException 
{
+               this(new Path(checkpointDataUri).toUri());
        }
 
+       /**
+        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
+        * file system and location defined by the given URI.
+        *
+        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
+        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
+        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
+        * config, or included in the classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
+               // creating the FsStateBackend automatically sanity checks the 
URI
+               FsStateBackend fsStateBackend = new 
FsStateBackend(checkpointDataUri);
+               
+               this.nonPartitionedStateBackend = fsStateBackend;
+               this.checkpointDirectory = fsStateBackend.getBasePath();
+       }
+
+
+       public RocksDBStateBackend(
+                       String checkpointDataUri, AbstractStateBackend 
nonPartitionedStateBackend) throws IOException {
+               
+               this(new Path(checkpointDataUri).toUri(), 
nonPartitionedStateBackend);
+       }
+       
+       public RocksDBStateBackend(
+                       URI checkpointDataUri, AbstractStateBackend 
nonPartitionedStateBackend) throws IOException {
+
+               this.nonPartitionedStateBackend = 
requireNonNull(nonPartitionedStateBackend);
+               this.checkpointDirectory = 
FsStateBackend.validateAndNormalizeUri(checkpointDataUri);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  State backend methods
        // 
------------------------------------------------------------------------
        
        @Override
        public void initializeForJob(
                        Environment env, 
                        String operatorIdentifier,
-                       TypeSerializer<?> keySerializer) throws Exception
-       {
+                       TypeSerializer<?> keySerializer) throws Exception {
+               
                super.initializeForJob(env, operatorIdentifier, keySerializer);
+
+               this.nonPartitionedStateBackend.initializeForJob(env, 
operatorIdentifier, keySerializer);
+               
                this.operatorIdentifier = operatorIdentifier.replace(" ", "");
-               backingStateBackend.initializeForJob(env, operatorIdentifier, 
keySerializer);
                this.jobId = env.getJobID();
+               
+               // initialize the paths where the local RocksDB files should be 
stored
+               if (dbBasePaths == null) {
+                       // initialize from the temp directories
+                       dbStorageDirectories = 
env.getIOManager().getSpillingDirectories();
+               }
+               else {
+                       List<File> dirs = new ArrayList<>(dbBasePaths.length);
+                       String errorMessage = "";
+                       
+                       for (Path path : dbBasePaths) {
+                               File f = new File(path.toUri().getPath());
+                               if (!f.exists() && !f.mkdirs()) {
+                                       String msg = "Local DB files directory 
'" + f.getAbsolutePath()
+                                                       + "' does not exist and 
cannot be created. ";
+                                       LOG.error(msg);
+                                       errorMessage += msg;
+                               }
+                               dirs.add(f);
+                       }
+                       
+                       if (dirs.isEmpty()) {
+                               throw new Exception("No local storage 
directories available. " + errorMessage);
+                       } else {
+                               dbStorageDirectories = dirs.toArray(new 
File[dirs.size()]);
+                       }
+               }
+               
+               nextDirectory = new 
Random().nextInt(dbStorageDirectories.length);
        }
 
        @Override
-       public void disposeAllStateForCurrentJob() throws Exception {}
+       public void disposeAllStateForCurrentJob() throws Exception {
+               nonPartitionedStateBackend.disposeAllStateForCurrentJob();
+       }
 
        @Override
        public void close() throws Exception {
+               nonPartitionedStateBackend.close();
+               
                Options opt = this.rocksDbOptions;
                if (opt != null) {
                        opt.dispose();
@@ -111,13 +216,25 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                }
        }
 
-       private File getDbPath(String stateName) {
-               return new File(new File(new File(new File(dbBasePath), 
jobId.toString()), operatorIdentifier), stateName);
+       File getDbPath(String stateName) {
+               return new File(new File(new File(getNextStoragePath(), 
jobId.toString()), operatorIdentifier), stateName);
        }
 
-       private String getCheckpointPath(String stateName) {
+       String getCheckpointPath(String stateName) {
                return checkpointDirectory + "/" + jobId.toString() + "/" + 
operatorIdentifier + "/" + stateName;
        }
+       
+       File[] getStoragePaths() {
+               return dbStorageDirectories;
+       }
+       
+       File getNextStoragePath() {
+               int ni = nextDirectory + 1;
+               ni = ni >= dbStorageDirectories.length ? 0 : ni;
+               nextDirectory = ni;
+               
+               return dbStorageDirectories[ni];
+       }
 
        // 
------------------------------------------------------------------------
        //  State factories
@@ -154,20 +271,94 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
-       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID,
-               long timestamp) throws Exception {
-               return 
backingStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
+       public CheckpointStateOutputStream createCheckpointStateOutputStream(
+                       long checkpointID, long timestamp) throws Exception {
+               
+               return 
nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, 
timestamp);
        }
 
        @Override
-       public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(S state,
-               long checkpointID,
-               long timestamp) throws Exception {
-               return backingStateBackend.checkpointStateSerializable(state, 
checkpointID, timestamp);
+       public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(
+                       S state, long checkpointID, long timestamp) throws 
Exception {
+               
+               return 
nonPartitionedStateBackend.checkpointStateSerializable(state, checkpointID, 
timestamp);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Parameters
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Sets the path where the RocksDB local database files should be 
stored on the local
+        * file system. Setting this path overrides the default behavior, where 
the
+        * files are stored across the configured temp directories.
+        * 
+        * <p>Passing {@code null} to this function restores the default 
behavior, where the configured
+        * temp directories will be used.
+        * 
+        * @param path The path where the local RocksDB database files are 
stored.
+        */
+       public void setDbStoragePath(String path) {
+               setDbStoragePaths(path == null ? null : new String[] { path });
+       }
+
+       /**
+        * Sets the paths across which the local RocksDB database files are 
distributed on the local
+        * file system. Setting these paths overrides the default behavior, 
where the
+        * files are stored across the configured temp directories.
+        * 
+        * <p>Each distinct state will be stored in one path, but when the 
state backend creates
+        * multiple states, they will store their files on different paths.
+        * 
+        * <p>Passing {@code null} to this function restores the default 
behavior, where the configured
+        * temp directories will be used.
+        * 
+        * @param paths The paths across which the local RocksDB database files 
will be spread. 
+        */
+       public void setDbStoragePaths(String... paths) {
+               if (paths == null) {
+                       dbBasePaths = null;
+               } 
+               else if (paths.length == 0) {
+                       throw new IllegalArgumentException("empty paths");
+               }
+               else {
+                       Path[] pp = new Path[paths.length];
+                       
+                       for (int i = 0; i < paths.length; i++) {
+                               if (paths[i] == null) {
+                                       throw new 
IllegalArgumentException("null path");
+                               }
+                               
+                               pp[i] = new Path(paths[i]);
+                               String scheme = pp[i].toUri().getScheme();
+                               if (scheme != null && 
!scheme.equalsIgnoreCase("file")) {
+                                       throw new 
IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
+                               }
+                       }
+                       
+                       dbBasePaths = pp;
+               }
+       }
+
+       /**
+        * 
+        * @return The configured DB storage paths, or null, if none were 
configured. 
+        */
+       public String[] getDbStoragePaths() {
+               if (dbBasePaths == null) {
+                       return null;
+               } else {
+                       String[] paths = new String[dbBasePaths.length];
+                       for (int i = 0; i < paths.length; i++) {
+                               paths[i] = dbBasePaths[i].toString();
+                       }
+                       return paths;
+               }
        }
        
        // 
------------------------------------------------------------------------
-       //  Parametrize with Options
+       //  Parametrize with RocksDB Options
        // 
------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
new file mode 100644
index 0000000..e62d39c
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Test;
+
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for configuring the RocksDB State Backend 
+ */
+@SuppressWarnings("serial")
+public class RocksDBStateBackendConfigTest {
+       
+       private static final String TEMP_URI = new 
File(System.getProperty("java.io.tmpdir")).toURI().toString();
+
+       // 
------------------------------------------------------------------------
+       //  RocksDB local file directory
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void testSetDbPath() throws Exception {
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+               
+               assertNull(rocksDbBackend.getDbStoragePaths());
+               
+               rocksDbBackend.setDbStoragePath("/abc/def");
+               assertArrayEquals(new String[] { "/abc/def" }, 
rocksDbBackend.getDbStoragePaths());
+
+               rocksDbBackend.setDbStoragePath(null);
+               assertNull(rocksDbBackend.getDbStoragePaths());
+
+               rocksDbBackend.setDbStoragePaths("/abc/def", "/uvw/xyz");
+               assertArrayEquals(new String[] { "/abc/def", "/uvw/xyz" }, 
rocksDbBackend.getDbStoragePaths());
+
+               //noinspection NullArgumentToVariableArgMethod
+               rocksDbBackend.setDbStoragePaths(null);
+               assertNull(rocksDbBackend.getDbStoragePaths());
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testSetNullPaths() throws Exception {
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+               rocksDbBackend.setDbStoragePaths();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testNonFileSchemePath() throws Exception {
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+               
rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  RocksDB local file automatic from temp directories
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void testUseTempDirectories() throws Exception {
+               File dir1 = new File(System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString());
+               File dir2 = new File(System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString());
+
+               File[] tempDirs = new File[] { dir1, dir2 };
+               
+               try {
+                       assertTrue(dir1.mkdirs());
+                       assertTrue(dir2.mkdirs());
+
+                       RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+                       assertNull(rocksDbBackend.getDbStoragePaths());
+                       
+                       
rocksDbBackend.initializeForJob(getMockEnvironment(tempDirs), "foobar", 
IntSerializer.INSTANCE);
+                       assertArrayEquals(tempDirs, 
rocksDbBackend.getStoragePaths());
+               }
+               finally {
+                       FileUtils.deleteDirectory(dir1);
+                       FileUtils.deleteDirectory(dir2);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  RocksDB local file directory initialization
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testFailWhenNoLocalStorageDir() throws Exception {
+               File targetDir = new File(System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString());
+               try {
+                       assertTrue(targetDir.mkdirs());
+                       
+                       if (!targetDir.setWritable(false, false)) {
+                               System.err.println("Cannot execute 
'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable");
+                               return;
+                       }
+                       
+                       RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+                       
rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
+                       
+                       try {
+                               
rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", 
IntSerializer.INSTANCE);
+                       }
+                       catch (Exception e) {
+                               assertTrue(e.getMessage().contains("No local 
storage directories available"));
+                               
assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
+                       }
+               }
+               finally {
+                       //noinspection ResultOfMethodCallIgnored
+                       targetDir.setWritable(true, false);
+                       FileUtils.deleteDirectory(targetDir);
+               }
+       }
+
+       @Test
+       public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
+               File targetDir1 = new 
File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+               File targetDir2 = new 
File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+               
+               try {
+                       assertTrue(targetDir1.mkdirs());
+                       assertTrue(targetDir2.mkdirs());
+       
+                       if (!targetDir1.setWritable(false, false)) {
+                               System.err.println("Cannot execute 
'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory 
non-writable");
+                               return;
+                       }
+       
+                       RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+                       
rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), 
targetDir2.getAbsolutePath());
+       
+                       try {
+                               
rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", 
IntSerializer.INSTANCE);
+                       }
+                       catch (Exception e) {
+                               e.printStackTrace();
+                               fail("Backend initialization failed even though 
some paths were available");
+                       }
+               } finally {
+                       //noinspection ResultOfMethodCallIgnored
+                       targetDir1.setWritable(true, false);
+                       FileUtils.deleteDirectory(targetDir1);
+                       FileUtils.deleteDirectory(targetDir2);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  RocksDB Options
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void testPredefinedOptions() throws Exception {
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+               
+               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
+               
+               
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+               assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
+               
+               Options opt1 = rocksDbBackend.getRocksDBOptions();
+               Options opt2 = rocksDbBackend.getRocksDBOptions();
+               
+               assertEquals(opt1, opt2);
+               
+               assertEquals(CompactionStyle.LEVEL, opt1.compactionStyle());
+       }
+
+       @Test
+       public void testOptionsFactory() throws Exception {
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+               
+               rocksDbBackend.setOptions(new OptionsFactory() {
+                       @Override
+                       public Options createOptions(Options currentOptions) {
+                               return 
currentOptions.setCompactionStyle(CompactionStyle.FIFO);
+                       }
+               });
+               
+               assertNotNull(rocksDbBackend.getOptions());
+               assertEquals(CompactionStyle.FIFO, 
rocksDbBackend.getRocksDBOptions().compactionStyle());
+       }
+
+       @Test
+       public void testPredefinedAndOptionsFactory() throws Exception {
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI);
+
+               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
+
+               
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+               rocksDbBackend.setOptions(new OptionsFactory() {
+                       @Override
+                       public Options createOptions(Options currentOptions) {
+                               return 
currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
+                       }
+               });
+               
+               assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
+               assertNotNull(rocksDbBackend.getOptions());
+               assertEquals(CompactionStyle.UNIVERSAL, 
rocksDbBackend.getRocksDBOptions().compactionStyle());
+       }
+
+       @Test
+       public void testPredefinedOptionsEnum() {
+               for (PredefinedOptions o : PredefinedOptions.values()) {
+                       Options opt = o.createOptions();
+                       try {
+                               assertNotNull(opt);
+                       } finally {
+                               opt.dispose();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Contained Non-partitioned State Backend
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void testCallsForwardedToNonPartitionedBackend() throws 
Exception {
+               AbstractStateBackend nonPartBackend = 
mock(AbstractStateBackend.class);
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(TEMP_URI, nonPartBackend);
+
+               rocksDbBackend.initializeForJob(getMockEnvironment(), "foo", 
IntSerializer.INSTANCE);
+               verify(nonPartBackend, 
times(1)).initializeForJob(any(Environment.class), anyString(), 
any(TypeSerializer.class));
+
+               rocksDbBackend.disposeAllStateForCurrentJob();
+               verify(nonPartBackend, times(1)).disposeAllStateForCurrentJob();
+               
+               rocksDbBackend.close();
+               verify(nonPartBackend, times(1)).close();
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static Environment getMockEnvironment() {
+               return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
+       }
+       
+       private static Environment getMockEnvironment(File[] tempDirs) {
+               IOManager ioMan = mock(IOManager.class);
+               when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
+               
+               Environment env = mock(Environment.class);
+               when(env.getJobID()).thenReturn(new JobID());
+               
when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
+               when(env.getIOManager()).thenReturn(ioMan);
+               return env;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 3b3ac31..fe933e0 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -40,7 +40,9 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                dbDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
                chkDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
 
-               return new RocksDBStateBackend(dbDir.getAbsolutePath(), 
"file://" + chkDir.getAbsolutePath(), new MemoryStateBackend());
+               RocksDBStateBackend backend = new 
RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
+               backend.setDbStoragePath(dbDir.getAbsolutePath());
+               return backend;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 411b536..37c1392 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -97,7 +97,7 @@ public class FsStateBackend extends AbstractStateBackend {
         * classpath.
         *
         * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
-        *                          and the path to teh checkpoint data 
directory.
+        *                          and the path to the checkpoint data 
directory.
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public FsStateBackend(String checkpointDataUri) throws IOException {
@@ -116,7 +116,7 @@ public class FsStateBackend extends AbstractStateBackend {
         * classpath.
         *
         * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
-        *                          and the path to teh checkpoint data 
directory.
+        *                          and the path to the checkpoint data 
directory.
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public FsStateBackend(Path checkpointDataUri) throws IOException {
@@ -161,21 +161,6 @@ public class FsStateBackend extends AbstractStateBackend {
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public FsStateBackend(URI checkpointDataUri, int 
fileStateSizeThreshold) throws IOException {
-               final String scheme = checkpointDataUri.getScheme();
-               final String path = checkpointDataUri.getPath();
-
-               // some validity checks
-               if (scheme == null) {
-                       throw new IllegalArgumentException("The scheme 
(hdfs://, file://, etc) is null. " +
-                                       "Please specify the file system scheme 
explicitly in the URI.");
-               }
-               if (path == null) {
-                       throw new IllegalArgumentException("The path to store 
the checkpoint data in is null. " +
-                                       "Please specify a directory path for 
the checkpoint data.");
-               }
-               if (path.length() == 0 || path.equals("/")) {
-                       throw new IllegalArgumentException("Cannot use the root 
directory for checkpoints.");
-               }
                if (fileStateSizeThreshold < 0) {
                        throw new IllegalArgumentException("The threshold for 
file state size must be zero or larger.");
                }
@@ -183,30 +168,10 @@ public class FsStateBackend extends AbstractStateBackend {
                        throw new IllegalArgumentException("The threshold for 
file state size cannot be larger than " +
                                MAX_FILE_STATE_THRESHOLD);
                }
-
-               // we do a bit of work to make sure that the URI for the 
filesystem refers to exactly the same
-               // (distributed) filesystem on all hosts and includes full 
host/port information, even if the
-               // original URI did not include that. We count on the 
filesystem loading from the configuration
-               // to fill in the missing data.
-
-               // try to grab the file system for this path/URI
-               this.filesystem = FileSystem.get(checkpointDataUri);
-               if (this.filesystem == null) {
-                       throw new IOException("Could not find a file system for 
the given scheme in the available configurations.");
-               }
-
-               URI fsURI = this.filesystem.getUri();
-               try {
-                       URI baseURI = new URI(fsURI.getScheme(), 
fsURI.getAuthority(), path, null, null);
-                       this.basePath = new Path(baseURI);
-               }
-               catch (URISyntaxException e) {
-                       throw new IOException(
-                                       String.format("Cannot create file 
system URI for checkpointDataUri %s and filesystem URI %s",
-                                                       checkpointDataUri, 
fsURI), e);
-               }
-               
                this.fileStateThreshold = fileStateSizeThreshold;
+               
+               this.basePath = validateAndNormalizeUri(checkpointDataUri);
+               this.filesystem = this.basePath.getFileSystem();
        }
 
        /**
@@ -371,6 +336,60 @@ public class FsStateBackend extends AbstractStateBackend {
                        "File State Backend (initialized) @ " + 
checkpointDirectory;
        }
 
+       /**
+        * Checks and normalizes the checkpoint data URI. This method first 
checks the validity of the
+        * URI (scheme, path, availability of a matching file system) and then 
normalizes the URI
+        * to a path.
+        * 
+        * <p>If the URI does not include an authority, but the file system 
configured for the URI has an
+        * authority, then the normalized path will include this authority.
+        * 
+        * @param checkpointDataUri The URI to check and normalize.
+        * @return A normalized URI as a Path.
+        * 
+        * @throws IllegalArgumentException Thrown, if the URI misses scheme or 
path. 
+        * @throws IOException Thrown, if no file system can be found for the 
URI's scheme.
+        */
+       public static Path validateAndNormalizeUri(URI checkpointDataUri) 
throws IOException {
+               final String scheme = checkpointDataUri.getScheme();
+               final String path = checkpointDataUri.getPath();
+
+               // some validity checks
+               if (scheme == null) {
+                       throw new IllegalArgumentException("The scheme 
(hdfs://, file://, etc) is null. " +
+                                       "Please specify the file system scheme 
explicitly in the URI.");
+               }
+               if (path == null) {
+                       throw new IllegalArgumentException("The path to store 
the checkpoint data in is null. " +
+                                       "Please specify a directory path for 
the checkpoint data.");
+               }
+               if (path.length() == 0 || path.equals("/")) {
+                       throw new IllegalArgumentException("Cannot use the root 
directory for checkpoints.");
+               }
+
+               // we do a bit of work to make sure that the URI for the 
filesystem refers to exactly the same
+               // (distributed) filesystem on all hosts and includes full 
host/port information, even if the
+               // original URI did not include that. We count on the 
filesystem loading from the configuration
+               // to fill in the missing data.
+
+               // try to grab the file system for this path/URI
+               FileSystem filesystem = FileSystem.get(checkpointDataUri);
+               if (filesystem == null) {
+                       throw new IOException("Could not find a file system for 
the given scheme in the available configurations.");
+               }
+
+               URI fsURI = filesystem.getUri();
+               try {
+                       URI baseURI = new URI(fsURI.getScheme(), 
fsURI.getAuthority(), path, null, null);
+                       return new Path(baseURI);
+               }
+               catch (URISyntaxException e) {
+                       throw new IOException(
+                                       String.format("Cannot create file 
system URI for checkpointDataUri %s and filesystem URI %s",
+                                                       checkpointDataUri, 
fsURI), e);
+               }
+       }
+       
        // 
------------------------------------------------------------------------
        //  Output stream for state checkpointing
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 2039528..9400bd7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -111,16 +111,19 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        case MEM:
                                this.stateBackend = new MemoryStateBackend();
                                break;
-                       case FILE:
+                       case FILE: {
                                String backups = 
tempFolder.newFolder().getAbsolutePath();
                                this.stateBackend = new 
FsStateBackend("file://" + backups);
                                break;
-                       case ROCKSDB:
+                       }
+                       case ROCKSDB: {
                                String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
-                               String rocksDbBackups = 
tempFolder.newFolder().getAbsolutePath();
-
-                               this.stateBackend = new 
RocksDBStateBackend(rocksDb, "file://" + rocksDbBackups, new 
MemoryStateBackend());
+                               String rocksDbBackups = 
tempFolder.newFolder().toURI().toString();
+                               RocksDBStateBackend rdb = new 
RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+                               rdb.setDbStoragePath(rocksDb);
+                               this.stateBackend = rdb;
                                break;
+                       }
                }
        }
 

Reply via email to