dawidwys edited a comment on pull request #19219:
URL: https://github.com/apache/flink/pull/19219#issuecomment-1084641245


   I second @pnowojski opinion that serializers should be considered not thread 
safe and used accordingly. We should aim that if we pass a serializer to 
another thread it should always be a duplicated version of the serializer and 
once we pass it, we should cede ownership to that thread.
   
   In this particular case I believe the problem is in 
`org.apache.flink.runtime.operators.BatchTask#initInputLocalStrategy:1010`. We 
pass a serializer via the `BatchTask#inputIterators` to the `ExternalSorter` 
which spawns an additional thread for reading/sorting... At the same time the 
implementation of 
`org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory#getSerializer`
 does not call `duplicate` the first time it is called. Thus we pass the 
original serializer to a new thread of the `ExternalSorter`. Later on in the 
`BatchTask#run` we call `Driver#prepare` which calls `duplicate()` which causes 
the `ConcurrentModificationException`.
   
   I suggest removing the logic around 
`RuntimeSerializerFactory#firstSerializer` and always duplicate it:
   ```
       @Override
       public TypeSerializer<T> getSerializer() {
           if (this.serializer != null) {
               return this.serializer.duplicate();
           } else {
               throw new RuntimeException(
                       "SerializerFactory has not been initialized from 
configuration.");
           }
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to