Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5880#discussion_r182776735
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
    @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig 
executionConfig){
         * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
         */
        protected KryoSerializer(KryoSerializer<T> toCopy) {
    -           defaultSerializers = toCopy.defaultSerializers;
    -           defaultSerializerClasses = toCopy.defaultSerializerClasses;
     
    -           kryoRegistrations = toCopy.kryoRegistrations;
    +           this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
    +           this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
    +           this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
    +           this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
    +
    +           // deep copy the serializer instances in defaultSerializers
    +           for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> entry :
    +                   toCopy.defaultSerializers.entrySet()) {
     
    -           type = toCopy.type;
    -           if(type == null){
    -                   throw new NullPointerException("Type class cannot be 
null.");
    +                   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
    +           }
    +
    +           // deep copy the serializer instances in kryoRegistrations
    +           for (Map.Entry<String, KryoRegistration> entry : 
toCopy.kryoRegistrations.entrySet()) {
    --- End diff --
    
    The problem is that we don't have the `ExecutionConfig` in the copy 
constructor.


---

Reply via email to