[hotfix] Remove some raw type usage in RocksDBKeyedStateBackend Introduce more generic parameters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c573540 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c573540 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c573540 Branch: refs/heads/release-1.3 Commit: 6c573540f1e558192cbd3763446a9e6dd848efce Parents: cfb6a69 Author: Till Rohrmann <[email protected]> Authored: Thu May 18 17:05:50 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu May 18 23:16:27 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 69 +++++++++----------- .../state/KeyedBackendSerializationProxy.java | 10 +-- .../state/heap/HeapKeyedStateBackend.java | 12 ++-- .../runtime/state/SerializationProxiesTest.java | 12 ++-- 4 files changed, 49 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index ddc7e17..d0f73bf 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -123,8 +123,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class); - private final JobID jobId; - private final String operatorIdentifier; /** The column family options from the options factory */ @@ -165,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * TODO this map can be removed when eager-state registration is in place. * TODO we currently need this cached to check state migration strategies when new serializers are registered. */ - private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> restoredKvStateMetaInfos; + private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos; /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; @@ -198,7 +196,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); - this.jobId = Preconditions.checkNotNull(jobId); this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; @@ -314,8 +311,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final long checkpointTimestamp, final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - final RocksDBIncrementalSnapshotOperation snapshotOperation = - new RocksDBIncrementalSnapshotOperation( + final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = + new RocksDBIncrementalSnapshotOperation<>( this, checkpointStreamFactory, checkpointId, @@ -365,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { long startTime = System.currentTimeMillis(); - final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(this, streamFactory); + final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory); // hold the db lock while operation on the db to guard us against async db disposal synchronized (asyncSnapshotLock) { @@ -440,12 +437,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. */ - static final class RocksDBFullSnapshotOperation { + static final class RocksDBFullSnapshotOperation<K> { static final int FIRST_BIT_IN_BYTE_MASK = 0x80; static final int END_OF_KEY_GROUP_MARK = 0xFFFF; - private final RocksDBKeyedStateBackend<?> stateBackend; + private final RocksDBKeyedStateBackend<K> stateBackend; private final KeyGroupRangeOffsets keyGroupRangeOffsets; private final CheckpointStreamFactory checkpointStreamFactory; @@ -461,7 +458,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private KeyGroupsStateHandle snapshotResultStateHandle; RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend<?> stateBackend, + RocksDBKeyedStateBackend<K> stateBackend, CheckpointStreamFactory checkpointStreamFactory) { this.stateBackend = stateBackend; @@ -601,8 +598,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ++kvStateId; } - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoSnapshots); + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots); serializationProxy.write(outputView); } @@ -710,10 +707,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - private static final class RocksDBIncrementalSnapshotOperation { + private static final class RocksDBIncrementalSnapshotOperation<K> { /** The backend which we snapshot */ - private final RocksDBKeyedStateBackend<?> stateBackend; + private final RocksDBKeyedStateBackend<K> stateBackend; /** Stream factory that creates the outpus streams to DFS */ private final CheckpointStreamFactory checkpointStreamFactory; @@ -748,7 +745,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private StreamStateHandle metaStateHandle = null; private RocksDBIncrementalSnapshotOperation( - RocksDBKeyedStateBackend<?> stateBackend, + RocksDBKeyedStateBackend<K> stateBackend, CheckpointStreamFactory checkpointStreamFactory, long checkpointId, long checkpointTimestamp) { @@ -810,8 +807,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); closeableRegistry.registerClosable(outputStream); - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots); + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots); DataOutputView out = new DataOutputViewStreamWrapper(outputStream); serializationProxy.write(out); @@ -964,10 +961,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.info("Converting RocksDB state from old savepoint."); restoreOldSavepointKeyedState(restoreState); } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { - RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this); + RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); restoreOperation.restore(restoreState); } else { - RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this); + RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); restoreOperation.doRestore(restoreState); } } catch (Exception ex) { @@ -1037,9 +1034,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. */ - static final class RocksDBFullRestoreOperation { + static final class RocksDBFullRestoreOperation<K> { - private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend; + private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; /** Current key-groups state handle from which we restore key-groups */ private KeyGroupsStateHandle currentKeyGroupsStateHandle; @@ -1055,7 +1052,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * * @param rocksDBKeyedStateBackend the state backend into which we restore */ - public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) { + public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); } @@ -1116,11 +1113,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws ClassNotFoundException * @throws RocksDBException */ - @SuppressWarnings("unchecked") - private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { + private void restoreKVStateMetaData() throws IOException, RocksDBException { - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader); + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); serializationProxy.read(currentStateHandleInView); @@ -1130,7 +1126,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.getKeySerializer(), TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), - (TypeSerializer) rocksDBKeyedStateBackend.keySerializer) + rocksDBKeyedStateBackend.keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration @@ -1221,15 +1217,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - private static class RocksDBIncrementalRestoreOperation { + private static class RocksDBIncrementalRestoreOperation<T> { - private final RocksDBKeyedStateBackend<?> stateBackend; + private final RocksDBKeyedStateBackend<T> stateBackend; - private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) { + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { this.stateBackend = stateBackend; } - @SuppressWarnings("unchecked") private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( StreamStateHandle metaStateHandle) throws Exception { @@ -1239,8 +1234,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { inputStream = metaStateHandle.openInputStream(); stateBackend.cancelStreamRegistry.registerClosable(inputStream); - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + KeyedBackendSerializationProxy<T> serializationProxy = + new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); DataInputView in = new DataInputViewStreamWrapper(inputStream); serializationProxy.read(in); @@ -1250,7 +1245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.getKeySerializer(), TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), - (TypeSerializer) stateBackend.keySerializer) + stateBackend.keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration @@ -1536,7 +1531,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // TODO with eager registration in place, these checks should be moved to restore() RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = - restoredKvStateMetaInfos.get(descriptor.getName()); + (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName()); Preconditions.checkState( newMetaInfo.getName().equals(restoredMetaInfo.getName()), @@ -1556,7 +1551,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // check compatibility results to determine if state migration is required - CompatibilityResult<N> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult( + CompatibilityResult<?> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), MigrationNamespaceSerializerProxy.class, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), @@ -1929,7 +1924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { new InstantiationUtil.ClassLoaderObjectInputStream( new DataInputViewStream(inputView), userCodeClassLoader); - StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject(); + StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject(); columnFamilyMapping.put(mappingByte, stateDescriptor); http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java index 94fb9f1..f265f78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java @@ -39,11 +39,11 @@ import java.util.List; * Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state * serialization logic here. */ -public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable { +public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable { public static final int VERSION = 3; - private TypeSerializer<?> keySerializer; + private TypeSerializer<K> keySerializer; private TypeSerializerConfigSnapshot keySerializerConfigSnapshot; private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots; @@ -55,7 +55,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable } public KeyedBackendSerializationProxy( - TypeSerializer<?> keySerializer, + TypeSerializer<K> keySerializer, List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { this.keySerializer = Preconditions.checkNotNull(keySerializer); @@ -70,7 +70,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable return stateMetaInfoSnapshots; } - public TypeSerializer<?> getKeySerializer() { + public TypeSerializer<K> getKeySerializer() { return keySerializer; } @@ -122,7 +122,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable public void read(DataInputView in) throws IOException { super.read(in); - final TypeSerializerSerializationProxy<?> keySerializerProxy = + final TypeSerializerSerializationProxy<K> keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader); // only starting from version 3, we have the key serializer and its config snapshot written http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 6eb314b..3e5645b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -272,8 +272,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - final KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots); + final KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots); //--------------------------------------------------- this becomes the end of sync part @@ -383,8 +383,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { try { DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream); - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(userCodeClassLoader); + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(userCodeClassLoader); serializationProxy.read(inView); @@ -395,7 +395,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.getKeySerializer(), TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), - (TypeSerializer) keySerializer) + keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration @@ -405,7 +405,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { keySerializerRestored = true; } - + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java index 8bbbd5f..3d5b210 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java @@ -66,8 +66,8 @@ public class SerializationProxiesTest { stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot()); - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList); + KeyedBackendSerializationProxy<?> serializationProxy = + new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList); byte[] serialized; try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { @@ -76,7 +76,7 @@ public class SerializationProxiesTest { } serializationProxy = - new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader()); + new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader()); try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { serializationProxy.read(new DataInputViewStreamWrapper(in)); @@ -103,8 +103,8 @@ public class SerializationProxiesTest { stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot()); - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList); + KeyedBackendSerializationProxy<?> serializationProxy = + new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList); byte[] serialized; try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { @@ -113,7 +113,7 @@ public class SerializationProxiesTest { } serializationProxy = - new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader()); + new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader()); // mock failure when deserializing serializers TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
