[
https://issues.apache.org/jira/browse/FLINK-37663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18089422#comment-18089422
]
Martijn Visser commented on FLINK-37663:
----------------------------------------
I reproduced this lost wakeup and validated the fix using the Fray
controlled-concurrency tester
(https://github.com/cmu-pasta/fray), which takes over the JVM thread scheduler
and systematically
explores interleavings, then emits a deterministic replay of any failing
schedule.
I'm opening a PR with a deterministic, dependency-free regression test
(`FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter`) that
reproduces the stall
without Fray and is verified red-green (fails before the fix, passes after).
> Thread Synchronization Issue in FutureCompletingBlockingQueue <> Connector
> Split Fetching
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-37663
> URL: https://issues.apache.org/jira/browse/FLINK-37663
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.18.1
> Reporter: Herbert Wang
> Priority: Major
> Labels: pull-request-available
>
> h1. Potential Thread Synchronization Issue in FutureCompletingBlockingQueue
> and Connector Split Fetching (Flink 1.18)
> *Context:* We are investigating potential sources of unresponsiveness or
> deadlocks related to connector split fetching in Flink 1.18, and our analysis
> points towards a possible synchronization issue within the
> {{FutureCompletingBlockingQueue}}. We are not certain this is the definitive
> cause, but we wanted to share our findings and reasoning for expert review.
> h2. Suspected Problem Description
> We suspect a potential thread synchronization issue exists in the
> {{FutureCompletingBlockingQueue}}'s interaction with split fetchers,
> specifically concerning the {{wakeUpPuttingThread()}} method. Our concern is
> that when this method manually wakes up a putting thread, it signals the
> associated condition variable but might not remove it from the internal
> {{notFull}} waiting queue. This could potentially lead to an inconsistent
> state where subsequent {{signalNextPutter()}} calls might signal an
> already-awoken or non-waiting thread's condition, effectively causing a "lost
> signal" for a genuinely waiting thread.
> h2. Hypothesized Scenario Illustrating the Concern
> Consider the following sequence of events based on our understanding of the
> code:
> # *Initial State*: The element queue (size 1) is full. A fetcher (Fetcher A)
> has completed its task, fetched records, and is blocked attempting to
> {{put()}} elements into the full queue.
> #* Fetcher A's thread calls {{waitOnPut()}} and goes to sleep.
> #* Its condition variable is added to the {{notFull}} queue.
> # *External Wake-up Trigger*: The {{SplitFetcherManager}} (or similar
> component) calls {{SplitFetcher.addSplits()}}, which eventually leads to a
> task being enqueued and a wake-up signal.
> #* Path: {{addSplits -> enqueueTaskUnsafe -> wakeUpUnsafe ->
> currentTask.wakeUp()}} (assuming Fetcher A was executing {{currentTask}}).
> # *Fetcher Wake-up Path*:
> #* {{FetchTask.wakeUp()}} sets an internal {{wakeup}} flag to {{true}}.
> #* It then calls {{elementsQueue.wakeUpPuttingThread(fetcherIndex)}} for
> Fetcher A.
> # *Potential Inconsistency Point*: Inside
> {{wakeUpPuttingThread(fetcherIndex)}}:
> #* The corresponding {{fetcherIndex}}'s wake-up flag is set to {{true}}.
> #* The associated condition variable (Fetcher A's) is signaled.
> #* Fetcher A's thread wakes up from {{waitOnPut()}}, likely checks its
> wake-up flag, and returns {{false}} from {{put()}}.
> #* *Key Concern*: Our analysis suggests that the condition variable for
> Fetcher A might *remain* in the {{notFull}} queue at this point, as
> {{wakeUpPuttingThread}} doesn't appear to remove it.
> # *Fetcher State Change*: Fetcher A, having returned from {{put()}} (possibly
> due to the wake-up), might subsequently be closed or enter an idle state.
> # *New Fetcher Blocks*: A different fetcher (Fetcher B) becomes active,
> fetches data, and attempts to {{put()}} elements into the queue, which is
> still full.
> #* Fetcher B calls {{waitOnPut()}}.
> #* Its condition variable is added to the {{notFull}} queue (potentially
> *after* Fetcher A's condition, if it remained).
> #* Fetcher B's thread goes to sleep.
> # *Consumer Action*: The source reader thread consumes an element from the
> queue via {{poll()}}.
> #* Path: {{getNextFetch() -> elementsQueue.poll() -> dequeue()}}
> # *Signaling Attempt*: Since the queue was full and is now not full,
> {{dequeue()}} calls {{signalNextPutter()}}.
> #* Path: {{dequeue() -> signalNextPutter() -> notFull.poll().signal()}}
> # *Potential Lost Signal*: *If* Fetcher A's condition remained in the
> {{notFull}} queue (as suspected in step 4) and is at the head of the queue,
> {{notFull.poll()}} will retrieve and signal Fetcher A's condition variable.
> #* This signal might be effectively lost because Fetcher A is no longer
> waiting on that condition (it was woken up manually or might even be closed).
> #* Fetcher B, which *is* genuinely waiting for space, remains asleep because
> its condition variable was not polled and signaled.
> #* This could lead to Fetcher B (and potentially others) never being woken
> up, resulting in stalled data fetching or apparent deadlocks.
> h2. Suggested Area for Investigation / Potential Fix
> Based on this hypothesis, the potential inconsistency seems to stem from
> {{wakeUpPuttingThread}} not removing the condition from the {{notFull}}
> queue. If this analysis is correct, a possible solution could involve
> ensuring the condition is removed when a thread is woken up manually via this
> path. For example, adding a line similar to:
> {code:java}
> // Inside wakeUpPuttingThread, after retrieving caf:
> notFull.remove(caf.condition()); // Conceptual - requires correct
> implementation details
> {code}
> This would aim to keep the {{notFull}} queue consistent with only genuinely
> waiting threads' conditions.
> *Request for Review:* We would greatly appreciate it if developers familiar
> with Flink's connector concurrency mechanisms could review this analysis. We
> are open to corrections if our understanding of the execution flow or
> synchronization logic is inaccurate. Our goal is to help identify or rule out
> this potential issue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)