[
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444066#comment-16444066
]
ASF GitHub Bot commented on FLINK-8836:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5880#discussion_r182749960
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
---
@@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig
executionConfig){
* Copy-constructor that does not copy transient fields. They will be
initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
- defaultSerializers = toCopy.defaultSerializers;
- defaultSerializerClasses = toCopy.defaultSerializerClasses;
- kryoRegistrations = toCopy.kryoRegistrations;
+ this.type = checkNotNull(toCopy.type, "Type class cannot be
null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new
LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry<Class<?>,
ExecutionConfig.SerializableSerializer<?>> entry :
+ toCopy.defaultSerializers.entrySet()) {
- type = toCopy.type;
- if(type == null){
- throw new NullPointerException("Type class cannot be
null.");
+ this.defaultSerializers.put(entry.getKey(),
deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry<String, KryoRegistration> entry :
toCopy.kryoRegistrations.entrySet()) {
--- End diff --
One alternative approach to this loop (though I'm not sure would be
better), is in the `buildKryoRegistrationsMethod` we always make a copy of the
`ExecutionConfig.SerializableSerializer` when instantiating its corresponding
`KryoRegistration`.
See
https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537.
Here we can make a copy already when building the registrations.
Then, when duplicating the `KryoSerializer`, for duplicating the
registrations, this would only be a matter of calling `buildKryoRegistrations`
again from the execution config because that method would handle stateful
serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?
> Duplicating a KryoSerializer does not duplicate registered default serializers
> ------------------------------------------------------------------------------
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Stefan Richter
> Priority: Blocker
> Fix For: 1.5.0
>
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer<T> duplicate() {
> return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer<T> toCopy) {
> defaultSerializers = toCopy.defaultSerializers;
> defaultSerializerClasses = toCopy.defaultSerializerClasses;
> kryoRegistrations = toCopy.kryoRegistrations;
> ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the
> {{defaultSerializers}} serializer instances are directly provided to the new
> {{KryoSerializer}} instance.
> This causes the fact that those default serializers are shared across two
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)