This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new c218097  [FLINK-10567][state] Fix TtlStateSerializer doe not propagate 
all field serializer during duplicate()
c218097 is described below

commit c2180972fc6d2772436b174acf6541928822185b
Author: minwenjun <[email protected]>
AuthorDate: Tue Oct 16 17:45:34 2018 +0200

    [FLINK-10567][state] Fix TtlStateSerializer doe not propagate all field 
serializer during duplicate()
    
    This closes #6860.
---
 .../flink/runtime/state/ttl/TtlStateFactory.java      |  2 +-
 .../flink/runtime/state/StateBackendTestBase.java     | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 2f291d3..45f4e3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -224,7 +224,7 @@ public class TtlStateFactory<N, SV, S extends State, IS 
extends S> {
                        TypeSerializer<?> ... originalSerializers) {
                        Preconditions.checkNotNull(originalSerializers);
                        Preconditions.checkArgument(originalSerializers.length 
== 2);
-                       return new TtlSerializer<>(precomputed, 
(TypeSerializer<T>) originalSerializers[1]);
+                       return new TtlSerializer<>(precomputed, 
originalSerializers);
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 23ad792..a08ce27 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -33,8 +33,10 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
@@ -1341,6 +1343,23 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                backend.dispose();
        }
 
+       @Test
+       public void testValueStateWorkWithTtl() throws Exception {
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+               try {
+                       ValueStateDescriptor<MutableLong> kvId = new 
ValueStateDescriptor<>("id", MutableLong.class);
+                       
kvId.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(1)).build());
+
+                       ValueState<MutableLong> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       backend.setCurrentKey(1);
+                       state.update(new MutableLong());
+                       state.value();
+               } finally {
+                       backend.close();
+                       backend.dispose();
+               }
+       }
+
        /**
         * Tests {@link ValueState#value()} and
         * {@link InternalKvState#getSerializedValue(byte[], TypeSerializer, 
TypeSerializer, TypeSerializer)}

Reply via email to