Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r174473498
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
---
@@ -70,10 +88,18 @@
* <p>If no value is associated with key and namespace,
<code>null</code>
* is returned.
*
+ * <p><b>TO IMPLEMENTERS:</b> This method is called by multiple
threads. Anything
+ * stateful (e.g. serializers) should be either duplicated or protected
from undesired
+ * consequences of concurrent invocations.
+ *
* @param serializedKeyAndNamespace Serialized key and namespace
* @return Serialized value or <code>null</code> if no value is
associated with the key and namespace.
*
* @throws Exception Exceptions during serialization are forwarded
*/
- byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws
Exception;
+ byte[] getSerializedValue(
--- End diff --
I would suggest to store the serializer in a thread local variable. The
current solution is a bit confusing because this interface suddenly exposes
serializers and caller have to provide serialzer in the `getSerializedValue`
method. In my opinion this interface does not make much sense in this way.
Furthermore, the serializers are copied externally into something that looks
like a custom-build thread local. I suggest having the serializers thread local
in the base class and bringing this interface back to the original form. There
is also only one threadpool, dedicated for queryable state that would hold the
serializers and even the current solution has a dedicated cleanup method. In
that place, we can just clean the thread locals.
---