dawidwys commented on pull request #19219:
URL: https://github.com/apache/flink/pull/19219#issuecomment-1084641245
I second @pnowojski opinion that serializers should be considered not thread
safe and used accordingly. We should aim that if we pass a serializer to
another thread it should always be a duplicated version of the serializer and
once we pass it, we should cede ownership to that thread.
In this particular case I believe the problem is in
`org.apache.flink.runtime.operators.BatchTask#initInputLocalStrategy:1010`. We
pass a serializer to the `ExternalSorter` which spawns additional thread for
reading/sorting... At the same time the implementation of
`org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory#getSerializer`
does not call `duplicate` the first time it is called. Thus we pass the
original serializer to a new thread of the `ExternalSorter`. Later on in the
`BatchTask#run` we call `Driver#prepare` which calls `duplicate()` which causes
the `ConcurrentModificationException`.
I'd suggest to fix the issue in `BatchTask` by calling the `duplicate()`
explicitly:
```
private void initInputLocalStrategy(int inputNum) throws Exception {
....
case SORT:
@SuppressWarnings({"rawtypes", "unchecked"})
Sorter<?> sorter =
ExternalSorter.newBuilder(
getMemoryManager(),
this,
// we must duplicate the
serializer as it will be used in a reading thread of the sorter
this.inputSerializers[inputNum].getSerializer().duplicate(),
getLocalStrategyComparator(inputNum))
....
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]