[hotfix] improve javadoc and logging of RocksDBKeyedStateBackend This closes #5366.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71dee4e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71dee4e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71dee4e6 Branch: refs/heads/master Commit: 71dee4e6ffdf01be3f0952e183f401f210133b29 Parents: 4e7f281 Author: Bowen Li <[email protected]> Authored: Thu Jan 25 20:59:38 2018 -0800 Committer: Till Rohrmann <[email protected]> Committed: Fri Jan 26 14:08:49 2018 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 33 ++++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/71dee4e6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 9185ad0..316f41c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -120,7 +120,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; /** - * A {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to + * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon * checkpointing. This state backend can store very large state that exceeds memory and spills * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe. @@ -139,7 +139,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** File suffix of sstable files. */ private static final String SST_FILE_SUFFIX = ".sst"; - /** Bytes for the name of the column decriptor for the default column family. */ + /** Bytes for the name of the column descriptor for the default column family. */ public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET); /** String that identifies the operator that owns this backend. */ @@ -154,17 +154,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** Path where this configured instance stores its data directory. */ private final File instanceBasePath; - /** Path where this configured instance stores its RocksDB data base. */ + /** Path where this configured instance stores its RocksDB database. */ private final File instanceRocksDBPath; /** - * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that dispose the + * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that disposes the * RocksDb object. */ private final ResourceGuard rocksDBResourceGuard; /** - * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState} + * Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState} * to store state. The different k/v states that we have don't each have their own RocksDB * instance. They all write to this instance but to their own column family. */ @@ -242,7 +242,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } if (!instanceBasePath.mkdirs()) { - throw new IOException("Could not create RocksDB data directory."); + throw new IOException( + String.format("Could not create RocksDB data directory at %s.", instanceBasePath.getAbsolutePath())); } this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; @@ -275,7 +276,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public void dispose() { super.dispose(); - // This call will block until all clients that still acquired access to the RocksDB instance have released it, + // This call will block until all clients that still acquire access to the RocksDB instance have released it, // so that we cannot release the native resources while clients are still working with it in parallel. rocksDBResourceGuard.close(); @@ -361,8 +362,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (kvStateInformation.isEmpty()) { if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + - checkpointTimestamp + " . Returning null."); + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", + checkpointTimestamp); } return DoneFuture.nullValue(); } @@ -409,8 +410,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (kvStateInformation.isEmpty()) { if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + - " . Returning null."); + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); } return DoneFuture.nullValue(); @@ -472,8 +472,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } }; - LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " + - Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms."); + LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); return AsyncStoppableTaskWithCallback.from(ioCallable); } @@ -1578,7 +1578,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Creates a column family handle for use with a k/v state. When restoring from a snapshot - * we don't restore the individual k/v states, just the global RocksDB data base and the + * we don't restore the individual k/v states, just the global RocksDB database and the * list of column families. When a k/v state is first requested we check here whether we * already have a column family for that and return it or create a new one if it doesn't exist. * @@ -1723,7 +1723,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator. + * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator. * Used by #MergeIterator. */ static final class MergeIterator implements AutoCloseable { @@ -1877,7 +1877,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * Returns the key-group for the current key. * @return key-group for the current key */ public int keyGroup() { @@ -1899,7 +1898,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * Returns the Id of the k/v state to which the current key belongs. * @return Id of K/V state to which the current key belongs. */ public int kvStateId() { @@ -1955,6 +1953,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Only visible for testing, DO NOT USE. */ + @VisibleForTesting public File getInstanceBasePath() { return instanceBasePath; }
