[
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523622#comment-16523622
]
ASF GitHub Bot commented on FLINK-9513:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6196#discussion_r198115714
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * <p>This class serializes a list of objects
+ *
+ * @param <T> type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
+ private final List<TypeSerializer> originalSerializers;
+
+ protected CompositeSerializer(List<TypeSerializer> originalSerializers)
{
+ Preconditions.checkNotNull(originalSerializers);
+ this.originalSerializers = originalSerializers;
+ }
+
+ protected abstract T composeValue(List values);
+
+ protected abstract List decomposeValue(T v);
+
+ protected abstract CompositeSerializer<T>
createSerializerInstance(List<TypeSerializer> originalSerializers);
+
+ private T composeValueInternal(List values) {
+ Preconditions.checkArgument(values.size() ==
originalSerializers.size());
+ return composeValue(values);
+ }
+
+ private List decomposeValueInternal(T v) {
+ List values = decomposeValue(v);
+ Preconditions.checkArgument(values.size() ==
originalSerializers.size());
+ return values;
+ }
+
+ private CompositeSerializer<T>
createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
+ Preconditions.checkArgument(originalSerializers.size() ==
originalSerializers.size());
+ return createSerializerInstance(originalSerializers);
+ }
+
+ @Override
+ public CompositeSerializer<T> duplicate() {
+ return
createSerializerInstanceInternal(originalSerializers.stream()
+ .map(TypeSerializer::duplicate)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+ }
+
+ @Override
+ public T createInstance() {
+ return composeValueInternal(originalSerializers.stream()
+ .map(TypeSerializer::createInstance)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public T copy(T from) {
+ List originalValues = decomposeValueInternal(from);
+ return composeValueInternal(
+ IntStream.range(0, originalSerializers.size())
+ .mapToObj(i ->
originalSerializers.get(i).copy(originalValues.get(i)))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ List originalFromValues = decomposeValueInternal(from);
+ List originalReuseValues = decomposeValueInternal(reuse);
+ return composeValueInternal(
+ IntStream.range(0, originalSerializers.size())
+ .mapToObj(i ->
originalSerializers.get(i).copy(originalFromValues.get(i),
originalReuseValues.get(i)))
--- End diff --
+1
> Wrap state binder with TTL logic
> --------------------------------
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Affects Versions: 1.6.0
> Reporter: Andrey Zagrebin
> Assignee: Andrey Zagrebin
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and
> the expiration timestamp (maybe meta data in future) and use the new object
> as a value in the existing implementations:
> {code:java}
> class TtlValue<V> {
> V value;
> long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
> bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
> <V> TtlValueState<V> createValueState(valueDesc) {
> serializer = new TtlValueSerializer(valueDesc.getSerializer);
> ttlValueDesc = new ValueDesc(serializer, ...);
> // or implement custom TypeInfo
> originalStateWithTtl = binder.createValueState(valueDesc);
> return new TtlValueState(originalStateWithTtl, timeProvider);
> }
> // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)