[ 
https://issues.apache.org/jira/browse/FLINK-38605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042113#comment-18042113
 ] 

Guotao Ding commented on FLINK-38605:
-------------------------------------

This feature is very valuable. I'm looking forward to it. [~zisaac] 

> Feature: Stateless Ordered Async 
> ---------------------------------
>
>                 Key: FLINK-38605
>                 URL: https://issues.apache.org/jira/browse/FLINK-38605
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>         Environment: *  
>            Reporter: Zach
>            Priority: Major
>         Attachments: image-2025-10-31-10-43-34-532.png, 
> image-2025-10-31-10-57-10-971.png
>
>
> Today, Flink built-in async operations require storing [operator 
> state.|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L226]
>  This happens in the 
> [AsyncWaitOperator|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java]
>  level, meaning it applies to ALL async functions using the built-in Flink 
> async solution. This isn not user defined state and is fundamental to core 
> working of all Flink async functions today, whether those functions use 
> additional user state or not. 
> In my experience in enterprise, storing this kind of state is disruptive to 
> users, especially cross functional users just trying to data pipeline without 
> in-depth Flink experience. My claim is that a version of async can be 
> implemented There issues with the current approach in large enterprise 
> applications with cross-functional data pipeline partners are:
>  * *Schema Can't Change*
>  ** The current async state implementation requires storing entire records in 
> Flink state
>  ** Records in my enterprise are often [Flink Rows 
> |https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/types/Row.html]or
>  strongly typed columnar storage. 
>  ** These rows have[ Flink 
> RowTypeInfo,|https://nightlies.apache.org/flink/flink-docs-release-2.1/api/java/org/apache/flink/api/java/typeutils/RowTypeInfo.html]
>  or information about each Column and its type and how to serialize and 
> deserialize them 
>  ** Users and cross functional partners makes schema (RowTypeInfo) changes 
> all the time when data pipelining. Making a schema change without a state 
> migration or replay results in an ambiguous "serialization failed" or 
> "deserialization failed" error message 
>  ** Users don't understand what's different. If they'd like to do a database 
> read, hit a Large Language Model (LLM), or do anything else stateful, its not 
> clear to users why they'd need to handle schema migrations differently. The 
> operations seem innocuous and don't lend themselves intuitively to 
> significant restrictions for future pipeline manipulations
>  * *Managing state is complicated*
>  ** State requires assigning a unique unchanging operator ID to the the 
> operator, such that state can know where to restore. 
>  ** Large state, especially in external file system state storage, is 
> expensive. Storing entire rows in state can build to a large size relatively 
> fast. State gets written every checkpoint, and in exactly once mode this 
> means large processing delays. 
>  ** Users don't expect their seemingly simple async function to cause 
> processing delays or long checkpoints
>  * *There is risk of data loss* 
>  ** If a user changes an async operation, say they change the API endpoint 
> they'd like to hit, what do we with existing state? What if the pipeline 
> hasn't run for a week, so the data in state is a week old? Do we try to 
> restore it, meaning old data will be past to the new endpoint? Do we enable 
> non-restored state and change the operator UID to drop the old data resulting 
> in data loss? 
>  ** We could ask users, but many are cross functional and aren't familiar 
> with the inner workings of Flink. If we asked them if they'd like to restore 
> old state, they'd be confused why such a question is even necessary to hit a 
> database. 
>  * *Starting over is expensive* 
>  ** One way to fix the problem with state and schema changes is to discard 
> all progress and start reading over from the beginning. This way all stored 
> state will contain records with the correct schema 
>  ** Starting over is very expensive for users, especially with large data or 
> with high realtime uptime requirements 
>  
> Given these constraints, I'd like to propose a *Stateless Async* operator 
> that achieves similar async processing to the existing async implementation, 
> but without storing any built-in state. This solution work's in my 
> enterprise's local fork and has proven effective.
>  
> *Why the existing solution needs state:*
> Today, the existing solution processes elements by adding them [to a 
> queue|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L256],
>  then storing the [queue in Flink 
> state|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L300].
>  
> Here is the flow today:
> !image-2025-10-31-10-43-34-532.png! As you can see, Flink state is needed 
> today because records can be *in-progress* (in the queue and being executed 
> in a separate thread) meaning not notated anywhere else in where standard 
> Flink is able to make data guarantees like exactly once. 
>  
> When the record is in progress in the queue, from Flink's perspective that 
> data is {*}not{*}{*}{*}
>  * In an preceding input or output buffer.
>  ** The record has been read out of the preceding buffers and the 
> "processElement" call has succeeded (meaning we stored the record in the 
> queue and called an async invoke)
>  ** If the data was in a buffer *before* the operator, the checkpoint barrier 
> would not have arrived to this operator until we finished processing the row. 
> However, since we technically did finish processing the row (processElement 
> finished), we can recieve a checkpoint barrier before we are done processing 
> all rows that came before it 
>  * In an downstream buffer or downstream operator 
>  ** The record is in progress in a separate thread and has not been collected 
> yet
> For these reasons, in the existing implementation, we can't allow the 
> checkpoint barrier to processes without storing all in-progress records in 
> state, otherwise we risk a checkpoint succeeding, and async call failing, and 
> having no way easy way to backtrack and process the data the failed. 
>  
> *My proposal:*
> The current approach prevents us from losing data after a checkpoint by 
> storing all in progress data in state. There is another way to prevent losing 
> data that does not involve state, and that is halting the checkpoint until 
> the queue empties. Here is my proposed flow:
> !image-2025-10-31-10-57-10-971.png! Why this works?
>  * Either the entire queue succeeds in a timely manner, or we fail the 
> checkpoint 
>  * If the entire queue succeeds, it does so before the checkpoint barrier 
> gets processed. So all records will be in a downstream operator or a 
> downstream buffer BEFORE the checkpoint barrier, and since all downstream 
> operators also ensure all their records are processed before accepting a 
> checkpoint barrier (this is standard Flink behavior), we are sure they will 
> reach the sinks of the pipeline before this checkpoint is succeeded by every 
> task. 
>  * If any records in the queue fails (after async retry policy) or times out, 
> the entire checkpoint fails and we start over reading from the sources at the 
> next record after the last successful checkpoint, so we are guaranteed to 
> process each record into the output exactly once (if enabled). 
> How this solves the problems?
>  * Users can change schema (like add or remove a column) because the schema 
> of the queue in java memory doesn't require serialization and determined at 
> runtime each time on startup. There are no records in state that can have a 
> different schema than the new one, since there is no state 
>  * No state means no complex state management, and no long checkpoint times 
> due to storing state in blob store 
> Cons of this approach:
>  * Large duration outliers are more expensive. If one request takes much 
> longer than the rest, it can hold up the entire pipeline, waiting for the one 
> request to succeed to allow the checkpoint barrier to pass. In practice, most 
> enterprise apps don't tend to have these outliers, but some might and should 
> probably stay using the existing async solution. 
>  * More redundant computation of async requests. Currently, one failing 
> request does not cause all the ones nearby to be re-computer. In the proposed 
> approach, any single failure (failed all retries) in a checkpoint will cause 
> all records in that checkpoint to be recomputed. Usually not an issue, since 
> things can already get retried in normal Flink execution.  
>  
> Final tentative recommendation:
> Leave the existing async as an option, and implement a new stateless async. 
> Stateless async will have different tradeoffs, but is arguably better for 
> large enterprises with cross functional partners doing pipelining. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to