Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5880#discussion_r182776735
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
---
@@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig
executionConfig){
* Copy-constructor that does not copy transient fields. They will be
initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
- defaultSerializers = toCopy.defaultSerializers;
- defaultSerializerClasses = toCopy.defaultSerializerClasses;
- kryoRegistrations = toCopy.kryoRegistrations;
+ this.type = checkNotNull(toCopy.type, "Type class cannot be
null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new
LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new
LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry<Class<?>,
ExecutionConfig.SerializableSerializer<?>> entry :
+ toCopy.defaultSerializers.entrySet()) {
- type = toCopy.type;
- if(type == null){
- throw new NullPointerException("Type class cannot be
null.");
+ this.defaultSerializers.put(entry.getKey(),
deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry<String, KryoRegistration> entry :
toCopy.kryoRegistrations.entrySet()) {
--- End diff --
The problem is that we don't have the `ExecutionConfig` in the copy
constructor.
---