[
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405713#comment-16405713
]
ASF GitHub Bot commented on FLINK-8802:
---------------------------------------
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r175640731
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalQueryableKvState.java
---
@@ -16,46 +16,42 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.query;
+package org.apache.flink.runtime.state.internal;
-import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.query.KvStateInfo;
import org.apache.flink.util.Preconditions;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
- * An entry holding the {@link InternalKvState} along with its {@link
KvStateInfo}.
+ * An abstract base class to be subclassed by states that are expected to
be queryable.
+ * Its main task is to keep a "thread-local" copy of the different
serializers (if needed).
*
* @param <K> The type of key the state is associated to
* @param <N> The type of the namespace the state is associated to
* @param <V> The type of values kept internally in state
*/
-@Internal
-public class KvStateEntry<K, N, V> {
+public abstract class InternalQueryableKvState<K, N, V> implements
InternalKvState<N> {
- private final InternalKvState<K, N, V> state;
private final KvStateInfo<K, N, V> stateInfo;
-
private final boolean areSerializersStateless;
- private final ConcurrentMap<Thread, KvStateInfo<K, N, V>>
serializerCache;
+ private final ConcurrentMap<Thread, KvStateInfo<K, N, V>>
serializerCache = new ConcurrentHashMap<>(4);
--- End diff --
nit: just wonder why didn't use ThreadLocal<KvStateInfo<K, N, V>> provided
by JDK...
> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers
> are not duplicated, which may lead to exceptions thrown when a serializer is
> stateful.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)