This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a22069ff8e80678c4d7a33c3ad349f0a4499567c
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Wed Feb 27 19:56:19 2019 +0800

    [FLINK-11772] [DataStream] InternalTimerServiceSerializationProxy should 
not be serializing timers' key / namespace serializers anymore
    
    All of the changes done to managed state surrounding how we no longer
    Java-serialize serializers anymore, and only write the serializer
    snapshot, was not reflected to how we snapshot timers. This was mainly
    due to the fact that timers were not handled by state backends (and were
    therefore not managed state) in the past, and were handled in an
    isolated manner by the InternalTimerServiceSerializationProxy.
    
    This closes #7849.
---
 .../api/operators/InternalTimerServiceImpl.java    | 29 ++++---
 .../InternalTimerServiceSerializationProxy.java    | 18 +++-
 .../api/operators/InternalTimersSnapshot.java      | 30 ++-----
 .../InternalTimersSnapshotReaderWriters.java       | 96 +++++++++++++++++-----
 .../operators/InternalTimerServiceImplTest.java    | 16 +++-
 5 files changed, 124 insertions(+), 65 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index a7a3490..dd88bc6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -265,13 +265,19 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N>,
        public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int 
keyGroupIdx) {
                return new InternalTimersSnapshot<>(
                        keySerializer,
-                       keySerializer.snapshotConfiguration(),
                        namespaceSerializer,
-                       namespaceSerializer.snapshotConfiguration(),
                        eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx),
                        
processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx));
        }
 
+       public TypeSerializer<K> getKeySerializer() {
+               return keySerializer;
+       }
+
+       public TypeSerializer<N> getNamespaceSerializer() {
+               return namespaceSerializer;
+       }
+
        /**
         * Restore the timers (both processing and event time ones) for a given 
{@code keyGroupIdx}.
         *
@@ -283,13 +289,17 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N>,
        public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> 
restoredSnapshot, int keyGroupIdx) {
                this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) 
restoredSnapshot;
 
-               if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
-                       throw new IllegalArgumentException("Tried to restore 
timers " +
-                               "for the same service with different 
serializers.");
+               TypeSerializer<K> restoredKeySerializer = 
restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer();
+               if (this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredKeySerializer)) {
+                       throw new IllegalArgumentException("Tried to restore 
timers for the same service with different key serializers.");
                }
+               this.keyDeserializer = restoredKeySerializer;
 
-               this.keyDeserializer = 
restoredTimersSnapshot.getKeySerializer();
-               this.namespaceDeserializer = 
restoredTimersSnapshot.getNamespaceSerializer();
+               TypeSerializer<N> restoredNamespaceSerializer = 
restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer();
+               if (this.namespaceDeserializer != null && 
!this.namespaceDeserializer.equals(restoredNamespaceSerializer)) {
+                       throw new IllegalArgumentException("Tried to restore 
timers for the same service with different namespace serializers.");
+               }
+               this.namespaceDeserializer = restoredNamespaceSerializer;
 
                checkArgument(localKeyGroupRange.contains(keyGroupIdx),
                        "Key Group " + keyGroupIdx + " does not belong to the 
local range.");
@@ -358,9 +368,4 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N>,
                }
                return result;
        }
-
-       private boolean 
areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> 
restoredSnapshot) {
-               return (this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
-                       (this.namespaceDeserializer != null && 
!this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
-       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java
index dea17f9..ca591bcd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java
@@ -35,7 +35,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class InternalTimerServiceSerializationProxy<K> extends 
PostVersionedIOReadableWritable {
 
-       public static final int VERSION = 1;
+       public static final int VERSION = 2;
 
        /** The key-group timer services to write / read. */
        private final InternalTimeServiceManager<K> timerServicesManager;
@@ -75,6 +75,12 @@ public class InternalTimerServiceSerializationProxy<K> 
extends PostVersionedIORe
        }
 
        @Override
+       public int[] getCompatibleVersions() {
+               return new int[] { VERSION, 1 };
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
        public void write(DataOutputView out) throws IOException {
                super.write(out);
                final Map<String, InternalTimerServiceImpl<K, ?>> 
registeredTimerServices =
@@ -87,7 +93,11 @@ public class InternalTimerServiceSerializationProxy<K> 
extends PostVersionedIORe
 
                        out.writeUTF(serviceName);
                        InternalTimersSnapshotReaderWriters
-                               .getWriterForVersion(VERSION, 
timerService.snapshotTimersForKeyGroup(keyGroupIdx))
+                               .getWriterForVersion(
+                                       VERSION,
+                                       
timerService.snapshotTimersForKeyGroup(keyGroupIdx),
+                                       timerService.getKeySerializer(),
+                                       (TypeSerializer) 
timerService.getNamespaceSerializer())
                                .writeTimersSnapshot(out);
                }
        }
@@ -115,8 +125,8 @@ public class InternalTimerServiceSerializationProxy<K> 
extends PostVersionedIORe
        @SuppressWarnings("unchecked")
        private <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
                String serviceName, InternalTimersSnapshot<?, ?> 
restoredTimersSnapshot) {
-               final TypeSerializer<K> keySerializer = (TypeSerializer<K>) 
restoredTimersSnapshot.getKeySerializer();
-               final TypeSerializer<N> namespaceSerializer = 
(TypeSerializer<N>) restoredTimersSnapshot.getNamespaceSerializer();
+               final TypeSerializer<K> keySerializer = (TypeSerializer<K>) 
restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer();
+               final TypeSerializer<N> namespaceSerializer = 
(TypeSerializer<N>) 
restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer();
                TimerSerializer<K, N> timerSerializer = new 
TimerSerializer<>(keySerializer, namespaceSerializer);
                return 
timerServicesManager.registerOrGetTimerService(serviceName, timerSerializer);
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
index 9c78aa5..d5e9483 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -32,9 +33,7 @@ import java.util.Set;
  */
 public class InternalTimersSnapshot<K, N> {
 
-       private TypeSerializer<K> keySerializer;
        private TypeSerializerSnapshot<K> keySerializerSnapshot;
-       private TypeSerializer<N> namespaceSerializer;
        private TypeSerializerSnapshot<N> namespaceSerializerSnapshot;
 
        private Set<TimerHeapInternalTimer<K, N>> eventTimeTimers;
@@ -46,28 +45,19 @@ public class InternalTimersSnapshot<K, N> {
        /** Constructor to use when snapshotting the timers. */
        public InternalTimersSnapshot(
                        TypeSerializer<K> keySerializer,
-                       TypeSerializerSnapshot<K> keySerializerSnapshot,
                        TypeSerializer<N> namespaceSerializer,
-                       TypeSerializerSnapshot<N> namespaceSerializerSnapshot,
                        @Nullable Set<TimerHeapInternalTimer<K, N>> 
eventTimeTimers,
                        @Nullable Set<TimerHeapInternalTimer<K, N>> 
processingTimeTimers) {
 
-               this.keySerializer = Preconditions.checkNotNull(keySerializer);
-               this.keySerializerSnapshot = 
Preconditions.checkNotNull(keySerializerSnapshot);
-               this.namespaceSerializer = 
Preconditions.checkNotNull(namespaceSerializer);
-               this.namespaceSerializerSnapshot = 
Preconditions.checkNotNull(namespaceSerializerSnapshot);
+               Preconditions.checkNotNull(keySerializer);
+               this.keySerializerSnapshot = 
TypeSerializerUtils.snapshotBackwardsCompatible(keySerializer);
+               Preconditions.checkNotNull(namespaceSerializer);
+               this.namespaceSerializerSnapshot = 
TypeSerializerUtils.snapshotBackwardsCompatible(namespaceSerializer);
+
                this.eventTimeTimers = eventTimeTimers;
                this.processingTimeTimers = processingTimeTimers;
        }
 
-       public TypeSerializer<K> getKeySerializer() {
-               return keySerializer;
-       }
-
-       public void setKeySerializer(TypeSerializer<K> keySerializer) {
-               this.keySerializer = keySerializer;
-       }
-
        public TypeSerializerSnapshot<K> getKeySerializerSnapshot() {
                return keySerializerSnapshot;
        }
@@ -76,14 +66,6 @@ public class InternalTimersSnapshot<K, N> {
                this.keySerializerSnapshot = keySerializerConfigSnapshot;
        }
 
-       public TypeSerializer<N> getNamespaceSerializer() {
-               return namespaceSerializer;
-       }
-
-       public void setNamespaceSerializer(TypeSerializer<N> 
namespaceSerializer) {
-               this.namespaceSerializer = namespaceSerializer;
-       }
-
        public TypeSerializerSnapshot<N> getNamespaceSerializerSnapshot() {
                return namespaceSerializerSnapshot;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index d61e542..9dcd959 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -56,16 +56,24 @@ public class InternalTimersSnapshotReaderWriters {
        //  Writers
        //   - pre-versioned: Flink 1.4.0
        //   - v1: Flink 1.4.1
+       //   - v2: Flink 1.8.0
        // 
-------------------------------------------------------------------------------
 
-       public static <K, N> InternalTimersSnapshotWriter 
getWriterForVersion(int version, InternalTimersSnapshot<K, N> timersSnapshot) {
+       public static <K, N> InternalTimersSnapshotWriter getWriterForVersion(
+                       int version,
+                       InternalTimersSnapshot<K, N> timersSnapshot,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) {
 
                switch (version) {
                        case NO_VERSION:
-                               return new 
InternalTimersSnapshotWriterPreVersioned<>(timersSnapshot);
+                               return new 
InternalTimersSnapshotWriterPreVersioned<>(timersSnapshot, keySerializer, 
namespaceSerializer);
+
+                       case 1:
+                               return new 
InternalTimersSnapshotWriterV1<>(timersSnapshot, keySerializer, 
namespaceSerializer);
 
                        case InternalTimerServiceSerializationProxy.VERSION:
-                               return new 
InternalTimersSnapshotWriterV1<>(timersSnapshot);
+                               return new 
InternalTimersSnapshotWriterV2<>(timersSnapshot, keySerializer, 
namespaceSerializer);
 
                        default:
                                // guard for future
@@ -92,8 +100,16 @@ public class InternalTimersSnapshotReaderWriters {
 
                protected final InternalTimersSnapshot<K, N> timersSnapshot;
 
-               public 
AbstractInternalTimersSnapshotWriter(InternalTimersSnapshot<K, N> 
timersSnapshot) {
+               protected final TypeSerializer<K> keySerializer;
+               protected final TypeSerializer<N> namespaceSerializer;
+
+               public AbstractInternalTimersSnapshotWriter(
+                               InternalTimersSnapshot<K, N> timersSnapshot,
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer) {
                        this.timersSnapshot = checkNotNull(timersSnapshot);
+                       this.keySerializer = checkNotNull(keySerializer);
+                       this.namespaceSerializer = 
checkNotNull(namespaceSerializer);
                }
 
                protected abstract void 
writeKeyAndNamespaceSerializers(DataOutputView out) throws IOException;
@@ -103,8 +119,8 @@ public class InternalTimersSnapshotReaderWriters {
                        writeKeyAndNamespaceSerializers(out);
 
                        LegacyTimerSerializer<K, N> timerSerializer = new 
LegacyTimerSerializer<>(
-                               timersSnapshot.getKeySerializer(),
-                               timersSnapshot.getNamespaceSerializer());
+                               keySerializer,
+                               namespaceSerializer);
 
                        // write the event time timers
                        Set<TimerHeapInternalTimer<K, N>> eventTimers = 
timersSnapshot.getEventTimeTimers();
@@ -132,16 +148,19 @@ public class InternalTimersSnapshotReaderWriters {
 
        private static class InternalTimersSnapshotWriterPreVersioned<K, N> 
extends AbstractInternalTimersSnapshotWriter<K, N> {
 
-               public 
InternalTimersSnapshotWriterPreVersioned(InternalTimersSnapshot<K, N> 
timersSnapshot) {
-                       super(timersSnapshot);
+               public InternalTimersSnapshotWriterPreVersioned(
+                               InternalTimersSnapshot<K, N> timersSnapshot,
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer) {
+                       super(timersSnapshot, keySerializer, 
namespaceSerializer);
                }
 
                @Override
                protected void writeKeyAndNamespaceSerializers(DataOutputView 
out) throws IOException {
                        // the pre-versioned format only serializes the 
serializers, without their configuration snapshots
                        try (ByteArrayOutputStreamWithPos stream = new 
ByteArrayOutputStreamWithPos()) {
-                               InstantiationUtil.serializeObject(stream, 
timersSnapshot.getKeySerializer());
-                               InstantiationUtil.serializeObject(stream, 
timersSnapshot.getNamespaceSerializer());
+                               InstantiationUtil.serializeObject(stream, 
keySerializer);
+                               InstantiationUtil.serializeObject(stream, 
namespaceSerializer);
 
                                out.write(stream.getBuf(), 0, 
stream.getPosition());
                        }
@@ -150,8 +169,11 @@ public class InternalTimersSnapshotReaderWriters {
 
        private static class InternalTimersSnapshotWriterV1<K, N> extends 
AbstractInternalTimersSnapshotWriter<K, N> {
 
-               public InternalTimersSnapshotWriterV1(InternalTimersSnapshot<K, 
N> timersSnapshot) {
-                       super(timersSnapshot);
+               public InternalTimersSnapshotWriterV1(
+                               InternalTimersSnapshot<K, N> timersSnapshot,
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer) {
+                       super(timersSnapshot, keySerializer, 
namespaceSerializer);
                }
 
                @Override
@@ -160,8 +182,24 @@ public class InternalTimersSnapshotReaderWriters {
                        
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
                                out,
                                Arrays.asList(
-                                       
Tuple2.of(timersSnapshot.getKeySerializer(), 
timersSnapshot.getKeySerializerSnapshot()),
-                                       
Tuple2.of(timersSnapshot.getNamespaceSerializer(), 
timersSnapshot.getNamespaceSerializerSnapshot())));
+                                       Tuple2.of(keySerializer, 
timersSnapshot.getKeySerializerSnapshot()),
+                                       Tuple2.of(namespaceSerializer, 
timersSnapshot.getNamespaceSerializerSnapshot())));
+               }
+       }
+
+       private static class InternalTimersSnapshotWriterV2<K, N> extends 
AbstractInternalTimersSnapshotWriter<K, N> {
+
+               public InternalTimersSnapshotWriterV2(
+                               InternalTimersSnapshot<K, N> timersSnapshot,
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer) {
+                       super(timersSnapshot, keySerializer, 
namespaceSerializer);
+               }
+
+               @Override
+               protected void writeKeyAndNamespaceSerializers(DataOutputView 
out) throws IOException {
+                       TypeSerializerSnapshot.writeVersionedSnapshot(out, 
timersSnapshot.getKeySerializerSnapshot());
+                       TypeSerializerSnapshot.writeVersionedSnapshot(out, 
timersSnapshot.getNamespaceSerializerSnapshot());
                }
        }
 
@@ -178,9 +216,12 @@ public class InternalTimersSnapshotReaderWriters {
                        case NO_VERSION:
                                return new 
InternalTimersSnapshotReaderPreVersioned<>(userCodeClassLoader);
 
-                       case InternalTimerServiceSerializationProxy.VERSION:
+                       case 1:
                                return new 
InternalTimersSnapshotReaderV1<>(userCodeClassLoader);
 
+                       case InternalTimerServiceSerializationProxy.VERSION:
+                               return new 
InternalTimersSnapshotReaderV2<>(userCodeClassLoader);
+
                        default:
                                // guard for future
                                throw new IllegalStateException(
@@ -223,8 +264,8 @@ public class InternalTimersSnapshotReaderWriters {
 
                        LegacyTimerSerializer<K, N> timerSerializer =
                                new LegacyTimerSerializer<>(
-                                       
restoredTimersSnapshot.getKeySerializer(),
-                                       
restoredTimersSnapshot.getNamespaceSerializer());
+                                       
restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer(),
+                                       
restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer());
 
                        // read the event time timers
                        int sizeOfEventTimeTimers = in.readInt();
@@ -269,9 +310,7 @@ public class InternalTimersSnapshotReaderWriters {
                                final TypeSerializer<K> keySerializer = 
InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true);
                                final TypeSerializer<N> namespaceSerializer = 
InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true);
 
-                               
restoredTimersSnapshot.setKeySerializer(keySerializer);
                                
restoredTimersSnapshot.setKeySerializerSnapshot(new 
BackwardsCompatibleSerializerSnapshot<>(keySerializer));
-                               
restoredTimersSnapshot.setNamespaceSerializer(namespaceSerializer);
                                
restoredTimersSnapshot.setNamespaceSerializerSnapshot(new 
BackwardsCompatibleSerializerSnapshot<>(namespaceSerializer));
                        } catch (ClassNotFoundException exception) {
                                throw new IOException(exception);
@@ -294,13 +333,28 @@ public class InternalTimersSnapshotReaderWriters {
                        List<Tuple2<TypeSerializer<?>, 
TypeSerializerSnapshot<?>>> serializersAndConfigs =
                                
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader);
 
-                       
restoredTimersSnapshot.setKeySerializer((TypeSerializer<K>) 
serializersAndConfigs.get(0).f0);
                        
restoredTimersSnapshot.setKeySerializerSnapshot((TypeSerializerSnapshot<K>) 
serializersAndConfigs.get(0).f1);
-                       
restoredTimersSnapshot.setNamespaceSerializer((TypeSerializer<N>) 
serializersAndConfigs.get(1).f0);
                        
restoredTimersSnapshot.setNamespaceSerializerSnapshot((TypeSerializerSnapshot<N>)
 serializersAndConfigs.get(1).f1);
                }
        }
 
+       private static class InternalTimersSnapshotReaderV2<K, N> extends 
AbstractInternalTimersSnapshotReader<K, N> {
+
+               public InternalTimersSnapshotReaderV2(ClassLoader 
userCodeClassLoader) {
+                       super(userCodeClassLoader);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               protected void restoreKeyAndNamespaceSerializers(
+                       InternalTimersSnapshot<K, N> restoredTimersSnapshot,
+                       DataInputView in) throws IOException {
+
+                       
restoredTimersSnapshot.setKeySerializerSnapshot(TypeSerializerSnapshot.readVersionedSnapshot(in,
 userCodeClassLoader));
+                       
restoredTimersSnapshot.setNamespaceSerializerSnapshot(TypeSerializerSnapshot.readVersionedSnapshot(in,
 userCodeClassLoader));
+               }
+       }
+
        /**
         * A {@link TypeSerializer} used to serialize/deserialize a {@link 
TimerHeapInternalTimer}.
         */
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
index f2da6da..99d2700 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
@@ -620,10 +620,14 @@ public class InternalTimerServiceImplTest {
                Map<Integer, byte[]> snapshot = new HashMap<>();
                for (Integer keyGroupIndex : testKeyGroupRange) {
                        try (ByteArrayOutputStream outStream = new 
ByteArrayOutputStream()) {
-                               InternalTimersSnapshot<?, ?> timersSnapshot = 
timerService.snapshotTimersForKeyGroup(keyGroupIndex);
+                               InternalTimersSnapshot<Integer, String> 
timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex);
 
                                InternalTimersSnapshotReaderWriters
-                                       .getWriterForVersion(snapshotVersion, 
timersSnapshot)
+                                       .getWriterForVersion(
+                                               snapshotVersion,
+                                               timersSnapshot,
+                                               timerService.getKeySerializer(),
+                                               
timerService.getNamespaceSerializer())
                                        .writeTimersSnapshot(new 
DataOutputViewStreamWrapper(outStream));
 
                                snapshot.put(keyGroupIndex, 
outStream.toByteArray());
@@ -701,10 +705,14 @@ public class InternalTimerServiceImplTest {
                Map<Integer, byte[]> snapshot2 = new HashMap<>();
                for (Integer keyGroupIndex : testKeyGroupRange) {
                        try (ByteArrayOutputStream outStream = new 
ByteArrayOutputStream()) {
-                               InternalTimersSnapshot<?, ?> timersSnapshot = 
timerService.snapshotTimersForKeyGroup(keyGroupIndex);
+                               InternalTimersSnapshot<Integer, String> 
timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex);
 
                                InternalTimersSnapshotReaderWriters
-                                       .getWriterForVersion(snapshotVersion, 
timersSnapshot)
+                                       .getWriterForVersion(
+                                               snapshotVersion,
+                                               timersSnapshot,
+                                               timerService.getKeySerializer(),
+                                               
timerService.getNamespaceSerializer())
                                        .writeTimersSnapshot(new 
DataOutputViewStreamWrapper(outStream));
 
                                if (subKeyGroupRange1.contains(keyGroupIndex)) {

Reply via email to