kimgr opened a new pull request, #25340:
URL: https://github.com/apache/flink/pull/25340
This issue triggered when running a highly parallel batch job with
FileSource pointed at a large S3 bucket. The visible symptom was:
java.util.NoSuchElementException
at
java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471)
at
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94)
at
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
at
org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428)
...
While the exact sequence of events is unclear, there's an obvious
time-of-check/time-of-use bug in getRunningFetcher where fetchers may be
cleared between the time it's checked for empty and the time we try to take the
first element.
Use an Iterator, which provides a consistent view of the data even if the
underlying collection changes.
## What is the purpose of the change
There is a tiny race condition in `SingleThreadFetcherManager` that can
cause `java.util.NoSuchElementException` on reader close. This patch fixes the
race condition.
## Brief change log
Use the consistent view provided by `ConcurrentHashMap`'s `ValuesView` and
its iterator.
## Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
(Caveat: it would be possible to verify this using a multithreaded stress
test, but they usually require some time to run to be useful. I did run a
reduced testcase for `ConcurrentHashMap` to see that the fix behaves better
than the original code.)
## Does this pull request potentially affect one of the following parts:
- The S3 file system connector: actually, yes! But not in a way that
should be noticeable except for better stability.
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]