This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4be1afa [FLINK-10567][state] Fix TtlStateSerializer doe not propagate
all field serializer during duplicate()
4be1afa is described below
commit 4be1afa3a65d23f468c109a87378c3df99fb9c86
Author: minwenjun <[email protected]>
AuthorDate: Tue Oct 16 23:45:34 2018 +0800
[FLINK-10567][state] Fix TtlStateSerializer doe not propagate all field
serializer during duplicate()
This closes #6860.
---
.../flink/runtime/state/ttl/TtlStateFactory.java | 2 +-
.../flink/runtime/state/StateBackendTestBase.java | 19 +++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 2f291d3..45f4e3b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -224,7 +224,7 @@ public class TtlStateFactory<N, SV, S extends State, IS
extends S> {
TypeSerializer<?> ... originalSerializers) {
Preconditions.checkNotNull(originalSerializers);
Preconditions.checkArgument(originalSerializers.length
== 2);
- return new TtlSerializer<>(precomputed,
(TypeSerializer<T>) originalSerializers[1]);
+ return new TtlSerializer<>(precomputed,
originalSerializers);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 0712b64..0a02a23 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -33,8 +33,10 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
@@ -1341,6 +1343,23 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> exten
backend.dispose();
}
+ @Test
+ public void testValueStateWorkWithTtl() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
+ try {
+ ValueStateDescriptor<MutableLong> kvId = new
ValueStateDescriptor<>("id", MutableLong.class);
+
kvId.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(1)).build());
+
+ ValueState<MutableLong> state =
backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+ backend.setCurrentKey(1);
+ state.update(new MutableLong());
+ state.value();
+ } finally {
+ backend.close();
+ backend.dispose();
+ }
+ }
+
/**
* Tests {@link ValueState#value()} and
* {@link InternalKvState#getSerializedValue(byte[], TypeSerializer,
TypeSerializer, TypeSerializer)}