[hotfix] Remove some raw type usage in RocksDBKeyedStateBackend

Introduce more generic parameters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c573540
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c573540
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c573540

Branch: refs/heads/release-1.3
Commit: 6c573540f1e558192cbd3763446a9e6dd848efce
Parents: cfb6a69
Author: Till Rohrmann <[email protected]>
Authored: Thu May 18 17:05:50 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu May 18 23:16:27 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 69 +++++++++-----------
 .../state/KeyedBackendSerializationProxy.java   | 10 +--
 .../state/heap/HeapKeyedStateBackend.java       | 12 ++--
 .../runtime/state/SerializationProxiesTest.java | 12 ++--
 4 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ddc7e17..d0f73bf 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -123,8 +123,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
 
-       private final JobID jobId;
-
        private final String operatorIdentifier;
 
        /** The column family options from the options factory */
@@ -165,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * TODO this map can be removed when eager-state registration is in 
place.
         * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
         */
-       private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> 
restoredKvStateMetaInfos;
+       private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
restoredKvStateMetaInfos;
 
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
@@ -198,7 +196,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
 
-               this.jobId = Preconditions.checkNotNull(jobId);
                this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
 
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
@@ -314,8 +311,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        final long checkpointTimestamp,
                        final CheckpointStreamFactory checkpointStreamFactory) 
throws Exception {
 
-               final RocksDBIncrementalSnapshotOperation snapshotOperation =
-                       new RocksDBIncrementalSnapshotOperation(
+               final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
+                       new RocksDBIncrementalSnapshotOperation<>(
                                this,
                                checkpointStreamFactory,
                                checkpointId,
@@ -365,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                long startTime = System.currentTimeMillis();
 
-               final RocksDBFullSnapshotOperation snapshotOperation = new 
RocksDBFullSnapshotOperation(this, streamFactory);
+               final RocksDBFullSnapshotOperation<K> snapshotOperation = new 
RocksDBFullSnapshotOperation<>(this, streamFactory);
                // hold the db lock while operation on the db to guard us 
against async db disposal
                synchronized (asyncSnapshotLock) {
 
@@ -440,12 +437,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /**
         * Encapsulates the process to perform a snapshot of a 
RocksDBKeyedStateBackend.
         */
-       static final class RocksDBFullSnapshotOperation {
+       static final class RocksDBFullSnapshotOperation<K> {
 
                static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
                static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
 
-               private final RocksDBKeyedStateBackend<?> stateBackend;
+               private final RocksDBKeyedStateBackend<K> stateBackend;
                private final KeyGroupRangeOffsets keyGroupRangeOffsets;
                private final CheckpointStreamFactory checkpointStreamFactory;
 
@@ -461,7 +458,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private KeyGroupsStateHandle snapshotResultStateHandle;
 
                RocksDBFullSnapshotOperation(
-                               RocksDBKeyedStateBackend<?> stateBackend,
+                               RocksDBKeyedStateBackend<K> stateBackend,
                                CheckpointStreamFactory 
checkpointStreamFactory) {
 
                        this.stateBackend = stateBackend;
@@ -601,8 +598,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                ++kvStateId;
                        }
 
-                       KeyedBackendSerializationProxy serializationProxy =
-                                       new 
KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), 
metaInfoSnapshots);
+                       KeyedBackendSerializationProxy<K> serializationProxy =
+                                       new 
KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), 
metaInfoSnapshots);
 
                        serializationProxy.write(outputView);
                }
@@ -710,10 +707,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
-       private static final class RocksDBIncrementalSnapshotOperation {
+       private static final class RocksDBIncrementalSnapshotOperation<K> {
 
                /** The backend which we snapshot */
-               private final RocksDBKeyedStateBackend<?> stateBackend;
+               private final RocksDBKeyedStateBackend<K> stateBackend;
 
                /** Stream factory that creates the outpus streams to DFS */
                private final CheckpointStreamFactory checkpointStreamFactory;
@@ -748,7 +745,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private StreamStateHandle metaStateHandle = null;
 
                private RocksDBIncrementalSnapshotOperation(
-                               RocksDBKeyedStateBackend<?> stateBackend,
+                               RocksDBKeyedStateBackend<K> stateBackend,
                                CheckpointStreamFactory checkpointStreamFactory,
                                long checkpointId,
                                long checkpointTimestamp) {
@@ -810,8 +807,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
                                
closeableRegistry.registerClosable(outputStream);
 
-                               KeyedBackendSerializationProxy 
serializationProxy =
-                                       new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, 
stateMetaInfoSnapshots);
+                               KeyedBackendSerializationProxy<K> 
serializationProxy =
+                                       new 
KeyedBackendSerializationProxy<>(stateBackend.keySerializer, 
stateMetaInfoSnapshots);
                                DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
 
                                serializationProxy.write(out);
@@ -964,10 +961,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                LOG.info("Converting RocksDB state from old 
savepoint.");
                                restoreOldSavepointKeyedState(restoreState);
                        } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
-                               RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation(this);
+                               RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
                                restoreOperation.restore(restoreState);
                        } else {
-                               RocksDBFullRestoreOperation restoreOperation = 
new RocksDBFullRestoreOperation(this);
+                               RocksDBFullRestoreOperation<K> restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
                                restoreOperation.doRestore(restoreState);
                        }
                } catch (Exception ex) {
@@ -1037,9 +1034,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /**
         * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a snapshot.
         */
-       static final class RocksDBFullRestoreOperation {
+       static final class RocksDBFullRestoreOperation<K> {
 
-               private final RocksDBKeyedStateBackend<?> 
rocksDBKeyedStateBackend;
+               private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
 
                /** Current key-groups state handle from which we restore 
key-groups */
                private KeyGroupsStateHandle currentKeyGroupsStateHandle;
@@ -1055,7 +1052,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 *
                 * @param rocksDBKeyedStateBackend the state backend into which 
we restore
                 */
-               public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<?> 
rocksDBKeyedStateBackend) {
+               public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
                        this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
                }
 
@@ -1116,11 +1113,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws ClassNotFoundException
                 * @throws RocksDBException
                 */
-               @SuppressWarnings("unchecked")
-               private void restoreKVStateMetaData() throws IOException, 
ClassNotFoundException, RocksDBException {
+               private void restoreKVStateMetaData() throws IOException, 
RocksDBException {
 
-                       KeyedBackendSerializationProxy serializationProxy =
-                                       new 
KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
+                       KeyedBackendSerializationProxy<K> serializationProxy =
+                                       new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
                        serializationProxy.read(currentStateHandleInView);
 
@@ -1130,7 +1126,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        serializationProxy.getKeySerializer(),
                                        
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
                                        
serializationProxy.getKeySerializerConfigSnapshot(),
-                                       (TypeSerializer) 
rocksDBKeyedStateBackend.keySerializer)
+                                       rocksDBKeyedStateBackend.keySerializer)
                                .isRequiresMigration()) {
 
                                // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
@@ -1221,15 +1217,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
-       private static class RocksDBIncrementalRestoreOperation {
+       private static class RocksDBIncrementalRestoreOperation<T> {
 
-               private final RocksDBKeyedStateBackend<?> stateBackend;
+               private final RocksDBKeyedStateBackend<T> stateBackend;
 
-               private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
+               private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
                        this.stateBackend = stateBackend;
                }
 
-               @SuppressWarnings("unchecked")
                private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
                                StreamStateHandle metaStateHandle) throws 
Exception {
 
@@ -1239,8 +1234,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                inputStream = metaStateHandle.openInputStream();
                                
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
 
-                               KeyedBackendSerializationProxy 
serializationProxy =
-                                       new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+                               KeyedBackendSerializationProxy<T> 
serializationProxy =
+                                       new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
                                DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
                                serializationProxy.read(in);
 
@@ -1250,7 +1245,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                
serializationProxy.getKeySerializer(),
                                                
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
                                                
serializationProxy.getKeySerializerConfigSnapshot(),
-                                               (TypeSerializer) 
stateBackend.keySerializer)
+                                               stateBackend.keySerializer)
                                        .isRequiresMigration()) {
 
                                        // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
@@ -1536,7 +1531,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // TODO with eager registration in place, these checks 
should be moved to restore()
 
                        RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
-                               
restoredKvStateMetaInfos.get(descriptor.getName());
+                               
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
 
                        Preconditions.checkState(
                                
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
@@ -1556,7 +1551,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        // check compatibility results to determine if state 
migration is required
 
-                       CompatibilityResult<N> namespaceCompatibility = 
StateMigrationUtil.resolveCompatibilityResult(
+                       CompatibilityResult<?> namespaceCompatibility = 
StateMigrationUtil.resolveCompatibilityResult(
                                        
restoredMetaInfo.getNamespaceSerializer(),
                                        MigrationNamespaceSerializerProxy.class,
                                        
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
@@ -1929,7 +1924,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        new 
InstantiationUtil.ClassLoaderObjectInputStream(
                                                        new 
DataInputViewStream(inputView), userCodeClassLoader);
 
-                       StateDescriptor stateDescriptor = (StateDescriptor) 
ooIn.readObject();
+                       StateDescriptor<?, ?> stateDescriptor = 
(StateDescriptor<?, ?>) ooIn.readObject();
 
                        columnFamilyMapping.put(mappingByte, stateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 94fb9f1..f265f78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -39,11 +39,11 @@ import java.util.List;
  * Serialization proxy for all meta data in keyed state backends. In the 
future we might also requiresMigration the actual state
  * serialization logic here.
  */
-public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable {
+public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritable {
 
        public static final int VERSION = 3;
 
-       private TypeSerializer<?> keySerializer;
+       private TypeSerializer<K> keySerializer;
        private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
 
        private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoSnapshots;
@@ -55,7 +55,7 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
        }
 
        public KeyedBackendSerializationProxy(
-                       TypeSerializer<?> keySerializer,
+                       TypeSerializer<K> keySerializer,
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
 
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -70,7 +70,7 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                return stateMetaInfoSnapshots;
        }
 
-       public TypeSerializer<?> getKeySerializer() {
+       public TypeSerializer<K> getKeySerializer() {
                return keySerializer;
        }
 
@@ -122,7 +122,7 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
        public void read(DataInputView in) throws IOException {
                super.read(in);
 
-               final TypeSerializerSerializationProxy<?> keySerializerProxy =
+               final TypeSerializerSerializationProxy<K> keySerializerProxy =
                        new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
 
                // only starting from version 3, we have the key serializer and 
its config snapshot written

http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6eb314b..3e5645b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -272,8 +272,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
                }
 
-               final KeyedBackendSerializationProxy serializationProxy =
-                               new 
KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
+               final KeyedBackendSerializationProxy<K> serializationProxy =
+                               new 
KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
 
                //--------------------------------------------------- this 
becomes the end of sync part
 
@@ -383,8 +383,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        try {
                                DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
 
-                               KeyedBackendSerializationProxy 
serializationProxy =
-                                               new 
KeyedBackendSerializationProxy(userCodeClassLoader);
+                               KeyedBackendSerializationProxy<K> 
serializationProxy =
+                                               new 
KeyedBackendSerializationProxy<>(userCodeClassLoader);
 
                                serializationProxy.read(inView);
 
@@ -395,7 +395,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                
serializationProxy.getKeySerializer(),
                                                
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
                                                
serializationProxy.getKeySerializerConfigSnapshot(),
-                                               (TypeSerializer) keySerializer)
+                                               keySerializer)
                                                .isRequiresMigration()) {
 
                                                // TODO replace with state 
migration; note that key hash codes need to remain the same after migration
@@ -405,7 +405,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                        keySerializerRestored = true;
                                }
-                               
+
                                
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
                                                
serializationProxy.getStateMetaInfoSnapshots();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 8bbbd5f..3d5b210 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -66,8 +66,8 @@ public class SerializationProxiesTest {
                stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
                        StateDescriptor.Type.VALUE, "c", namespaceSerializer, 
stateSerializer).snapshot());
 
-               KeyedBackendSerializationProxy serializationProxy =
-                               new 
KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+               KeyedBackendSerializationProxy<?> serializationProxy =
+                               new 
KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
@@ -76,7 +76,7 @@ public class SerializationProxiesTest {
                }
 
                serializationProxy =
-                               new 
KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+                               new 
KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
@@ -103,8 +103,8 @@ public class SerializationProxiesTest {
                stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
                        StateDescriptor.Type.VALUE, "c", namespaceSerializer, 
stateSerializer).snapshot());
 
-               KeyedBackendSerializationProxy serializationProxy =
-                       new KeyedBackendSerializationProxy(keySerializer, 
stateMetaInfoList);
+               KeyedBackendSerializationProxy<?> serializationProxy =
+                       new KeyedBackendSerializationProxy<>(keySerializer, 
stateMetaInfoList);
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
@@ -113,7 +113,7 @@ public class SerializationProxiesTest {
                }
 
                serializationProxy =
-                       new 
KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+                       new 
KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
                // mock failure when deserializing serializers
                TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);

Reply via email to