This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new f45f04b  [FLINK-10095][state] Swap serialization order in TTL value: 
first timestamp then user value
f45f04b is described below

commit f45f04bcaf6f223e9000b0f012dc0c8af3f9063e
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Mon Aug 6 14:55:27 2018 +0200

    [FLINK-10095][state] Swap serialization order in TTL value: first timestamp 
then user value
    
    This closes #6510.
    
    (cherry picked from commit b535fab4c4529a15e50174a30c3743275cedaab4)
---
 .../flink/runtime/state/ttl/TtlStateFactory.java   |  9 +--
 .../state/ttl/TtlStateSnapshotTransformer.java     | 16 ++---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  4 ++
 .../contrib/streaming/state/RocksDBMapState.java   | 75 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 303285a..2f291d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -190,9 +190,10 @@ public class TtlStateFactory<N, SV, S extends State, IS 
extends S> {
 
        /** Serializer for user state value with TTL. */
        private static class TtlSerializer<T> extends 
CompositeSerializer<TtlValue<T>> {
+               private static final long serialVersionUID = 
131020282727167064L;
 
                TtlSerializer(TypeSerializer<T> userValueSerializer) {
-                       super(true, userValueSerializer, 
LongSerializer.INSTANCE);
+                       super(true, LongSerializer.INSTANCE, 
userValueSerializer);
                }
 
                TtlSerializer(PrecomputedParameters precomputed, 
TypeSerializer<?> ... fieldSerializers) {
@@ -203,7 +204,7 @@ public class TtlStateFactory<N, SV, S extends State, IS 
extends S> {
                @Override
                public TtlValue<T> createInstance(@Nonnull Object ... values) {
                        Preconditions.checkArgument(values.length == 2);
-                       return new TtlValue<>((T) values[0], (long) values[1]);
+                       return new TtlValue<>((T) values[1], (long) values[0]);
                }
 
                @Override
@@ -213,7 +214,7 @@ public class TtlStateFactory<N, SV, S extends State, IS 
extends S> {
 
                @Override
                protected Object getField(@Nonnull TtlValue<T> v, int index) {
-                       return index == 0 ? v.getUserValue() : 
v.getLastAccessTimestamp();
+                       return index == 0 ? v.getLastAccessTimestamp() : 
v.getUserValue();
                }
 
                @SuppressWarnings("unchecked")
@@ -223,7 +224,7 @@ public class TtlStateFactory<N, SV, S extends State, IS 
extends S> {
                        TypeSerializer<?> ... originalSerializers) {
                        Preconditions.checkNotNull(originalSerializers);
                        Preconditions.checkArgument(originalSerializers.length 
== 2);
-                       return new TtlSerializer<>(precomputed, 
(TypeSerializer<T>) originalSerializers[0]);
+                       return new TtlSerializer<>(precomputed, 
(TypeSerializer<T>) originalSerializers[1]);
                }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java
index 228d045..1ee05cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateSnapshotTransformer.java
@@ -19,16 +19,14 @@
 package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import 
org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.Optional;
 
@@ -36,10 +34,12 @@ import java.util.Optional;
 abstract class TtlStateSnapshotTransformer<T> implements 
CollectionStateSnapshotTransformer<T> {
        private final TtlTimeProvider ttlTimeProvider;
        final long ttl;
+       private final ByteArrayDataInputView div;
 
        TtlStateSnapshotTransformer(@Nonnull TtlTimeProvider ttlTimeProvider, 
long ttl) {
                this.ttlTimeProvider = ttlTimeProvider;
                this.ttl = ttl;
+               this.div = new ByteArrayDataInputView();
        }
 
        <V> TtlValue<V> filterTtlValue(TtlValue<V> value) {
@@ -54,10 +54,9 @@ abstract class TtlStateSnapshotTransformer<T> implements 
CollectionStateSnapshot
                return TtlUtils.expired(ts, ttl, ttlTimeProvider);
        }
 
-       private static long deserializeTs(
-               byte[] value, int offset) throws IOException {
-               return LongSerializer.INSTANCE.deserialize(
-                       new DataInputViewStreamWrapper(new 
ByteArrayInputStream(value, offset, Long.BYTES)));
+       long deserializeTs(byte[] value) throws IOException {
+               div.setData(value, 0, Long.BYTES);
+               return LongSerializer.INSTANCE.deserialize(div);
        }
 
        @Override
@@ -88,10 +87,9 @@ abstract class TtlStateSnapshotTransformer<T> implements 
CollectionStateSnapshot
                        if (value == null) {
                                return null;
                        }
-                       Preconditions.checkArgument(value.length >= Long.BYTES);
                        long ts;
                        try {
-                               ts = deserializeTs(value, value.length - 
Long.BYTES);
+                               ts = deserializeTs(value);
                        } catch (IOException e) {
                                throw new FlinkRuntimeException("Unexpected 
timestamp deserialization failure");
                        }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f7af354..17ba985 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1400,6 +1400,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                if (stateDesc instanceof ListStateDescriptor) {
                        Optional<StateSnapshotTransformer<SEV>> original = 
snapshotTransformFactory.createForDeserializedState();
                        return original.map(est -> 
createRocksDBListStateTransformer(stateDesc, est)).orElse(null);
+               } else if (stateDesc instanceof MapStateDescriptor) {
+                       Optional<StateSnapshotTransformer<byte[]>> original = 
snapshotTransformFactory.createForSerializedState();
+                       return (StateSnapshotTransformer<SV>) original
+                               
.map(RocksDBMapState.StateSnapshotTransformerWrapper::new).orElse(null);
                } else {
                        Optional<StateSnapshotTransformer<byte[]>> original = 
snapshotTransformFactory.createForSerializedState();
                        return (StateSnapshotTransformer<SV>) 
original.orElse(null);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 4ec1f77..b08eade 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -31,6 +33,7 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -44,9 +47,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -625,4 +630,74 @@ class RocksDBMapState<K, N, UK, UV>
                        (Map<UK, UV>) stateDesc.getDefaultValue(),
                        backend);
        }
+
+       /**
+        * RocksDB map state specific byte value transformer wrapper.
+        *
+        * <p>This specific transformer wrapper checks the first byte to detect 
null user value entries
+        * and if not null forward the rest of byte array to the original byte 
value transformer.
+        */
+       static class StateSnapshotTransformerWrapper implements 
StateSnapshotTransformer<byte[]> {
+               private static final byte[] NULL_VALUE;
+               private static final byte NON_NULL_VALUE_PREFIX;
+               static {
+                       ByteArrayDataOutputView dov = new 
ByteArrayDataOutputView(1);
+                       try {
+                               dov.writeBoolean(true);
+                               NULL_VALUE = dov.toByteArray();
+                               dov.reset();
+                               dov.writeBoolean(false);
+                               NON_NULL_VALUE_PREFIX = dov.toByteArray()[0];
+                       } catch (IOException e) {
+                               throw new FlinkRuntimeException("Failed to 
serialize boolean flag of map user null value", e);
+                       }
+               }
+
+               private final StateSnapshotTransformer<byte[]> 
elementTransformer;
+               private final ByteArrayDataInputView div;
+
+               
StateSnapshotTransformerWrapper(StateSnapshotTransformer<byte[]> 
originalTransformer) {
+                       this.elementTransformer = originalTransformer;
+                       this.div = new ByteArrayDataInputView();
+               }
+
+               @Override
+               @Nullable
+               public byte[] filterOrTransform(@Nullable byte[] value) {
+                       if (value == null || isNull(value)) {
+                               return NULL_VALUE;
+                       } else {
+                               // we have to skip the first byte indicating 
null user value
+                               // TODO: optimization here could be to work 
with slices and not byte arrays
+                               // and copy slice sub-array only when needed
+                               byte[] woNullByte = Arrays.copyOfRange(value, 
1, value.length);
+                               byte[] filteredValue = 
elementTransformer.filterOrTransform(woNullByte);
+                               if (filteredValue == null) {
+                                       filteredValue = NULL_VALUE;
+                               } else if (filteredValue != woNullByte) {
+                                       filteredValue = 
prependWithNonNullByte(filteredValue, value);
+                               } else {
+                                       filteredValue = value;
+                               }
+                               return filteredValue;
+                       }
+               }
+
+               private boolean isNull(byte[] value) {
+                       try {
+                               div.setData(value, 0, 1);
+                               return div.readBoolean();
+                       } catch (IOException e) {
+                               throw new FlinkRuntimeException("Failed to 
deserialize boolean flag of map user null value", e);
+                       }
+               }
+
+               private static byte[] prependWithNonNullByte(byte[] value, 
byte[] reuse) {
+                       int len = 1 + value.length;
+                       byte[] result = reuse.length == len ? reuse : new 
byte[len];
+                       result[0] = NON_NULL_VALUE_PREFIX;
+                       System.arraycopy(value, 0, result, 1, value.length);
+                       return result;
+               }
+       }
 }

Reply via email to