[ 
https://issues.apache.org/jira/browse/FLINK-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622225#comment-16622225
 ] 

ASF GitHub Bot commented on FLINK-10157:
----------------------------------------

StefanRRichter commented on a change in pull request #6707: [FLINK-10157] 
[State TTL] Allow `null` user values in map state with TTL
URL: https://github.com/apache/flink/pull/6707#discussion_r219214268
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 ##########
 @@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Serializer wrapper to add support of {@code null} value serialization.
+ *
+ * <p>If the target serializer does not support {@code null} values of its 
type,
+ * you can use this class to wrap this serializer.
+ * This is a generic treatment of {@code null} value serialization
+ * which comes with the cost of additional byte in the final serialized value.
+ * The {@code NullableSerializer} will intercept {@code null} value 
serialization case
+ * and prepend the target serialized value with a boolean flag marking whether 
it is {@code null} or not.
+ * <pre> {@code
+ * TypeSerializer<T> originalSerializer = ...;
+ * TypeSerializer<T> serializerWithNullValueSupport = 
NullableSerializer.wrap(originalSerializer);
+ * // or
+ * TypeSerializer<T> serializerWithNullValueSupport = 
NullableSerializer.wrapIfNullIsNotSupported(originalSerializer);
+ * }}</pre>
+ *
+ * @param <T> type to serialize
+ */
+public class NullableSerializer<T> extends TypeSerializer<T> {
+       private static final long serialVersionUID = 3335569358214720033L;
+
+       @Nonnull
+       private final TypeSerializer<T> originalSerializer;
+       private final boolean padNullValue;
+       private final byte[] padding;
+
+       private NullableSerializer(@Nonnull TypeSerializer<T> 
originalSerializer, boolean padNullValueIfFixedLen) {
+               this.originalSerializer = originalSerializer;
+               this.padNullValue = originalSerializer.getLength() > 0 && 
padNullValueIfFixedLen;
+               padding = padNullValue ? new 
byte[originalSerializer.getLength()] : null;
 
 Review comment:
   You could use `byte[0]` for no padding and get rid of `null` checks as well 
as the `padNullValue` flag via a method `isPadding { return padding.length > 
0;}`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow `null` user values in map state with TTL
> ----------------------------------------------
>
>                 Key: FLINK-10157
>                 URL: https://issues.apache.org/jira/browse/FLINK-10157
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>         Environment: Flink:1.6.0
> Scala:2.11
> JDK:1.8
>            Reporter: chengjie.wu
>            Assignee: Andrey Zagrebin
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: StateWithOutTtlTest.scala, StateWithTtlTest.scala
>
>
> Thanks for the StateTtl feature,this is exactly what I need now! But I found 
> an issue.
> In the previous version or when StateTtl is not enabled,MapState allows 
> `null` value,that means after
> {code:java}
> mapState.put("key", null){code}
> , then
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff0000}*true*{color}, but when StateTtl is enabled,
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff0000}*false*{color}(*the key has not expired*).
>  So I think the field `userValue` in 
> `org.apache.flink.runtime.state.ttl.TtlValue` should allow `null` value. User 
> state is null may not means the TtlValue should be null.
>  
> {code:java}
> /**
>  * This class wraps user value of state with TTL.
>  *
>  * @param <T> Type of the user value of state with TTL
>  */
> class TtlValue<T> implements Serializable {
>  private final T userValue;
>  private final long lastAccessTimestamp;
> TtlValue(T userValue, long lastAccessTimestamp) {
>  Preconditions.checkNotNull(userValue);
>  this.userValue = userValue;
>  this.lastAccessTimestamp = lastAccessTimestamp;
>  }
> T getUserValue() {
>  return userValue;
>  }
> long getLastAccessTimestamp() {
>  return lastAccessTimestamp;
>  }
> }
> {code}
> Am I understanding right?
> This is my test class.
> [^StateWithTtlTest.scala] [^StateWithOutTtlTest.scala]
> ^Thanks!:)^



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to