[ 
https://issues.apache.org/jira/browse/FLINK-36146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881867#comment-17881867
 ] 

Kim Gräsman edited comment on FLINK-36146 at 9/16/24 7:02 AM:
--------------------------------------------------------------

Thank you!

Hmm, yeah, I don't understand the higher-level flow enough to say why this 
happens.

A few things that are maybe unusual about the runs where we saw this:
 * Flink 1.14.2 (i.e. bug could have been masked in newer releases by behavior 
changes "higher up")
 * Batch mode (I guess there's more distinct shutdown in batch mode when a task 
completes, but it's not really clear to me)
 * High parallelism (I think these runs had -p 1600. Not sure if that affects 
something?)
 * Many input files (I think I counted 400+ billion records, and I believe each 
record is a file in this archive)

So it's a big, old batch job.


was (Author: JIRAUSER306734):
Thank you!

Hmm, yeah, I don't understand the higher-level flow enough to say why this 
happens.

A few things that are maybe unusual about the runs where we saw this:
 * Flink 1.14.2 (i.e. bug could have been masked in newer releases by behavior 
changes "higher up")
 * Batch mode (I guess there's more distinct shutdown in batch mode when a task 
completes, but it's not really clear to me)
 * High parallelism (I think these runs had -p 400. Not sure if that affects 
something?)
 * Many input files (I think I counted 400+ billion records, and I believe each 
record is a file in this archive)

So it's a big, old batch job.

> NoSuchElement exception from SingleThreadFetcherManager
> -------------------------------------------------------
>
>                 Key: FLINK-36146
>                 URL: https://issues.apache.org/jira/browse/FLINK-36146
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>         Environment: AWS EMR/Yarn
>            Reporter: Kim Gräsman
>            Priority: Minor
>
> We're running Flink 1.14.2, but this appears to be an issue still on 
> mainline, so I thought I'd report it.
> When running with high parallelism we've noticed a spurious error triggered 
> by a FileSource reader from S3;
> {code:java}
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
> reading split(s) [0000543131]
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished reading from splits [0000543131]
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Closing splitFetcher 157 because it is idle.
> 2024-08-19 15:23:07,045 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Shutting down split fetcher 157
> 2024-08-19 15:23:07,045 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
> fetcher 157 exited.
> 2024-08-19 15:23:07,048 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [FileSourceSplit: ... [0, 21679984)  hosts=[localhost] 
> ID=0000201373 position=null]
> 2024-08-19 15:23:07,064 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
> Source Reader.
> 2024-08-19 15:23:07,069 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - Source: ... -> ... (114/1602)#0 (...) switched from RUNNING 
> to FAILED with failure cause: java.util.NoSuchElementException
>         at 
> java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82)
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
>         at 
> org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428)
>         at 
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:70)
>         at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:83)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1473)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>         at java.base/java.lang.Thread.run(Thread.java:829) {code}
> I believe this may be caused by a tiny TOCTOU race in 
> {{{}SingleThreadedFetcherManager{}}}. I'll admit that I don't fully 
> understand what the execution flows through that code look like, but the use 
> of atomic and synchronized indicate that it's used by multiple threads. If 
> that's not the case, this report can be safely ignored.
> The backtrace points to 
> [https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165|https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165.]
> And it looks like the concurrent hash map might be modified between the check 
> for {{isEmpty()}} and the call to {{{}fetchers.values().iterator().next(){}}}.
> I would suggest Python-style:
> {code:java}
> try {
>   return fetchers.values().iterator().next();
> } catch (NoSuchElementException) {
>   return null;
> }{code}
> here instead, which should let {{ConcurrentHashMap}} handle its 
> synchronization internally.
> For some reason we were able to reproduce consistently with 2 Task Managers 
> and 2 slots per node, but not with 1 Task Manager and 4 slots, if that helps 
> construct a repro test case (presumably more interlocking from 
> {{synchronized}} in a single-TM environment, but not sure).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to