[
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398262#comment-16398262
]
ASF GitHub Bot commented on FLINK-8802:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r174383199
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateInfo.java ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Metadata about a {@link InternalKvState}. This includes the serializers
for
+ * the key, the namespace, and the values kept in the state.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace the state is associated to
+ * @param <V> The type of values kept internally in state
+ */
+public class KvStateInfo<K, N, V> {
+
+ private final TypeSerializer<K> keySerializer;
+ private final TypeSerializer<N> namespaceSerializer;
+ private final TypeSerializer<V> stateValueSerializer;
+
+ public KvStateInfo(
+ final TypeSerializer<K> keySerializer,
+ final TypeSerializer<N> namespaceSerializer,
+ final TypeSerializer<V> stateValueSerializer
+ ) {
+ this.keySerializer = Preconditions.checkNotNull(keySerializer);
+ this.namespaceSerializer =
Preconditions.checkNotNull(namespaceSerializer);
+ this.stateValueSerializer =
Preconditions.checkNotNull(stateValueSerializer);
+ }
+
+ /**
+ * @return The serializer for the key the state is associated to.
+ */
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ /**
+ * @return The serializer for the namespace the state is associated to.
+ */
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ /**
+ * @return The serializer for the values kept in the state.
+ */
+ public TypeSerializer<V> getStateValueSerializer() {
+ return stateValueSerializer;
+ }
+
+ /**
+ * Creates a deep copy of the current {@link KvStateInfo} by duplicating
+ * all the included serializers.
+ *
+ * <p>This method assumes correct implementation of the {@link
TypeSerializer#duplicate()}
+ * method of the included serializers.
+ */
+ public KvStateInfo<K, N, V> duplicate() {
+ final TypeSerializer<K> dupKeySerializer =
keySerializer.duplicate();
+ final TypeSerializer<N> dupNamespaceSerializer =
namespaceSerializer.duplicate();
+ final TypeSerializer<V> dupSVSerializer =
stateValueSerializer.duplicate();
+
+ if (
+ dupKeySerializer == keySerializer &&
--- End diff --
odd formatting
> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers
> are not duplicated, which may lead to exceptions thrown when a serializer is
> stateful.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)