dawidwys edited a comment on pull request #19219:
URL: https://github.com/apache/flink/pull/19219#issuecomment-1084641245
I second @pnowojski opinion that serializers should be considered not thread
safe and used accordingly. We should aim that if we pass a serializer to
another thread it should always be a duplicated version of the serializer and
once we pass it, we should cede ownership to that thread.
In this particular case I believe the problem is in
`org.apache.flink.runtime.operators.BatchTask#initInputLocalStrategy:1010`. We
pass a serializer via the `BatchTask#inputIterators` to the `ExternalSorter`
which spawns an additional thread for reading/sorting... At the same time the
implementation of
`org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory#getSerializer`
does not call `duplicate` the first time it is called. Thus we pass the
original serializer to a new thread of the `ExternalSorter`. Later on in the
`BatchTask#run` we call `Driver#prepare` which calls `duplicate()` which causes
the `ConcurrentModificationException`.
I suggest removing the logic around
`RuntimeSerializerFactory#firstSerializer` and always duplicate it:
```
@Override
public TypeSerializer<T> getSerializer() {
if (this.serializer != null) {
return this.serializer.duplicate();
} else {
throw new RuntimeException(
"SerializerFactory has not been initialized from
configuration.");
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]