[hotfix] improve javadoc and logging of RocksDBKeyedStateBackend

This closes #5366.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71dee4e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71dee4e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71dee4e6

Branch: refs/heads/master
Commit: 71dee4e6ffdf01be3f0952e183f401f210133b29
Parents: 4e7f281
Author: Bowen Li <[email protected]>
Authored: Thu Jan 25 20:59:38 2018 -0800
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 14:08:49 2018 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 33 ++++++++++----------
 1 file changed, 16 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71dee4e6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 9185ad0..316f41c 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -120,7 +120,7 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 /**
- * A {@link AbstractKeyedStateBackend} that stores its state in {@code 
RocksDB} and will serialize state to
+ * An {@link AbstractKeyedStateBackend} that stores its state in {@code 
RocksDB} and serializes state to
  * streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory} upon
  * checkpointing. This state backend can store very large state that exceeds 
memory and spills
  * to disk. Except for the snapshotting, this class should be accessed as if 
it is not threadsafe.
@@ -139,7 +139,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** File suffix of sstable files. */
        private static final String SST_FILE_SUFFIX = ".sst";
 
-       /** Bytes for the name of the column decriptor for the default column 
family. */
+       /** Bytes for the name of the column descriptor for the default column 
family. */
        public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = 
"default".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
        /** String that identifies the operator that owns this backend. */
@@ -154,17 +154,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** Path where this configured instance stores its data directory. */
        private final File instanceBasePath;
 
-       /** Path where this configured instance stores its RocksDB data base. */
+       /** Path where this configured instance stores its RocksDB database. */
        private final File instanceRocksDBPath;
 
        /**
-        * Protects access to RocksDB in other threads, like the checkpointing 
thread from parallel call that dispose the
+        * Protects access to RocksDB in other threads, like the checkpointing 
thread from parallel call that disposes the
         * RocksDb object.
         */
        private final ResourceGuard rocksDBResourceGuard;
 
        /**
-        * Our RocksDB data base, this is used by the actual subclasses of 
{@link AbstractRocksDBState}
+        * Our RocksDB database, this is used by the actual subclasses of 
{@link AbstractRocksDBState}
         * to store state. The different k/v states that we have don't each 
have their own RocksDB
         * instance. They all write to this instance but to their own column 
family.
         */
@@ -242,7 +242,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                if (!instanceBasePath.mkdirs()) {
-                       throw new IOException("Could not create RocksDB data 
directory.");
+                       throw new IOException(
+                                       String.format("Could not create RocksDB 
data directory at %s.", instanceBasePath.getAbsolutePath()));
                }
 
                this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
@@ -275,7 +276,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        public void dispose() {
                super.dispose();
 
-               // This call will block until all clients that still acquired 
access to the RocksDB instance have released it,
+               // This call will block until all clients that still acquire 
access to the RocksDB instance have released it,
                // so that we cannot release the native resources while clients 
are still working with it in parallel.
                rocksDBResourceGuard.close();
 
@@ -361,8 +362,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                if (kvStateInformation.isEmpty()) {
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
-                                       checkpointTimestamp + " . Returning 
null.");
+                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
+                                               checkpointTimestamp);
                        }
                        return DoneFuture.nullValue();
                }
@@ -409,8 +410,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                if (kvStateInformation.isEmpty()) {
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
-                                       " . Returning null.");
+                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.", timestamp);
                        }
 
                        return DoneFuture.nullValue();
@@ -472,8 +472,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
                        };
 
-               LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", 
synchronous part) in thread " +
-                       Thread.currentThread() + " took " + 
(System.currentTimeMillis() - startTime) + " ms.");
+               LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) 
in thread {} took {} ms.",
+                               streamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
 
                return AsyncStoppableTaskWithCallback.from(ioCallable);
        }
@@ -1578,7 +1578,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        /**
         * Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-        * we don't restore the individual k/v states, just the global RocksDB 
data base and the
+        * we don't restore the individual k/v states, just the global RocksDB 
database and the
         * list of column families. When a k/v state is first requested we 
check here whether we
         * already have a column family for that and return it or create a new 
one if it doesn't exist.
         *
@@ -1723,7 +1723,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        /**
-        * Wraps a RocksDB iterator to cache it's current key and assign an id 
for the key/value state to the iterator.
+        * Wraps a RocksDB iterator to cache it's current key and assigns an id 
for the key/value state to the iterator.
         * Used by #MergeIterator.
         */
        static final class MergeIterator implements AutoCloseable {
@@ -1877,7 +1877,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                /**
-                * Returns the key-group for the current key.
                 * @return key-group for the current key
                 */
                public int keyGroup() {
@@ -1899,7 +1898,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                /**
-                * Returns the Id of the k/v state to which the current key 
belongs.
                 * @return Id of K/V state to which the current key belongs.
                 */
                public int kvStateId() {
@@ -1955,6 +1953,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /**
         * Only visible for testing, DO NOT USE.
         */
+       @VisibleForTesting
        public File getInstanceBasePath() {
                return instanceBasePath;
        }

Reply via email to