[
https://issues.apache.org/jira/browse/FLINK-38605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042113#comment-18042113
]
Guotao Ding commented on FLINK-38605:
-------------------------------------
This feature is very valuable. I'm looking forward to it. [~zisaac]
> Feature: Stateless Ordered Async
> ---------------------------------
>
> Key: FLINK-38605
> URL: https://issues.apache.org/jira/browse/FLINK-38605
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream
> Environment: *
> Reporter: Zach
> Priority: Major
> Attachments: image-2025-10-31-10-43-34-532.png,
> image-2025-10-31-10-57-10-971.png
>
>
> Today, Flink built-in async operations require storing [operator
> state.|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L226]
> This happens in the
> [AsyncWaitOperator|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java]
> level, meaning it applies to ALL async functions using the built-in Flink
> async solution. This isn not user defined state and is fundamental to core
> working of all Flink async functions today, whether those functions use
> additional user state or not.
> In my experience in enterprise, storing this kind of state is disruptive to
> users, especially cross functional users just trying to data pipeline without
> in-depth Flink experience. My claim is that a version of async can be
> implemented There issues with the current approach in large enterprise
> applications with cross-functional data pipeline partners are:
> * *Schema Can't Change*
> ** The current async state implementation requires storing entire records in
> Flink state
> ** Records in my enterprise are often [Flink Rows
> |https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/types/Row.html]or
> strongly typed columnar storage.
> ** These rows have[ Flink
> RowTypeInfo,|https://nightlies.apache.org/flink/flink-docs-release-2.1/api/java/org/apache/flink/api/java/typeutils/RowTypeInfo.html]
> or information about each Column and its type and how to serialize and
> deserialize them
> ** Users and cross functional partners makes schema (RowTypeInfo) changes
> all the time when data pipelining. Making a schema change without a state
> migration or replay results in an ambiguous "serialization failed" or
> "deserialization failed" error message
> ** Users don't understand what's different. If they'd like to do a database
> read, hit a Large Language Model (LLM), or do anything else stateful, its not
> clear to users why they'd need to handle schema migrations differently. The
> operations seem innocuous and don't lend themselves intuitively to
> significant restrictions for future pipeline manipulations
> * *Managing state is complicated*
> ** State requires assigning a unique unchanging operator ID to the the
> operator, such that state can know where to restore.
> ** Large state, especially in external file system state storage, is
> expensive. Storing entire rows in state can build to a large size relatively
> fast. State gets written every checkpoint, and in exactly once mode this
> means large processing delays.
> ** Users don't expect their seemingly simple async function to cause
> processing delays or long checkpoints
> * *There is risk of data loss*
> ** If a user changes an async operation, say they change the API endpoint
> they'd like to hit, what do we with existing state? What if the pipeline
> hasn't run for a week, so the data in state is a week old? Do we try to
> restore it, meaning old data will be past to the new endpoint? Do we enable
> non-restored state and change the operator UID to drop the old data resulting
> in data loss?
> ** We could ask users, but many are cross functional and aren't familiar
> with the inner workings of Flink. If we asked them if they'd like to restore
> old state, they'd be confused why such a question is even necessary to hit a
> database.
> * *Starting over is expensive*
> ** One way to fix the problem with state and schema changes is to discard
> all progress and start reading over from the beginning. This way all stored
> state will contain records with the correct schema
> ** Starting over is very expensive for users, especially with large data or
> with high realtime uptime requirements
>
> Given these constraints, I'd like to propose a *Stateless Async* operator
> that achieves similar async processing to the existing async implementation,
> but without storing any built-in state. This solution work's in my
> enterprise's local fork and has proven effective.
>
> *Why the existing solution needs state:*
> Today, the existing solution processes elements by adding them [to a
> queue|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L256],
> then storing the [queue in Flink
> state|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L300].
>
> Here is the flow today:
> !image-2025-10-31-10-43-34-532.png! As you can see, Flink state is needed
> today because records can be *in-progress* (in the queue and being executed
> in a separate thread) meaning not notated anywhere else in where standard
> Flink is able to make data guarantees like exactly once.
>
> When the record is in progress in the queue, from Flink's perspective that
> data is {*}not{*}{*}{*}
> * In an preceding input or output buffer.
> ** The record has been read out of the preceding buffers and the
> "processElement" call has succeeded (meaning we stored the record in the
> queue and called an async invoke)
> ** If the data was in a buffer *before* the operator, the checkpoint barrier
> would not have arrived to this operator until we finished processing the row.
> However, since we technically did finish processing the row (processElement
> finished), we can recieve a checkpoint barrier before we are done processing
> all rows that came before it
> * In an downstream buffer or downstream operator
> ** The record is in progress in a separate thread and has not been collected
> yet
> For these reasons, in the existing implementation, we can't allow the
> checkpoint barrier to processes without storing all in-progress records in
> state, otherwise we risk a checkpoint succeeding, and async call failing, and
> having no way easy way to backtrack and process the data the failed.
>
> *My proposal:*
> The current approach prevents us from losing data after a checkpoint by
> storing all in progress data in state. There is another way to prevent losing
> data that does not involve state, and that is halting the checkpoint until
> the queue empties. Here is my proposed flow:
> !image-2025-10-31-10-57-10-971.png! Why this works?
> * Either the entire queue succeeds in a timely manner, or we fail the
> checkpoint
> * If the entire queue succeeds, it does so before the checkpoint barrier
> gets processed. So all records will be in a downstream operator or a
> downstream buffer BEFORE the checkpoint barrier, and since all downstream
> operators also ensure all their records are processed before accepting a
> checkpoint barrier (this is standard Flink behavior), we are sure they will
> reach the sinks of the pipeline before this checkpoint is succeeded by every
> task.
> * If any records in the queue fails (after async retry policy) or times out,
> the entire checkpoint fails and we start over reading from the sources at the
> next record after the last successful checkpoint, so we are guaranteed to
> process each record into the output exactly once (if enabled).
> How this solves the problems?
> * Users can change schema (like add or remove a column) because the schema
> of the queue in java memory doesn't require serialization and determined at
> runtime each time on startup. There are no records in state that can have a
> different schema than the new one, since there is no state
> * No state means no complex state management, and no long checkpoint times
> due to storing state in blob store
> Cons of this approach:
> * Large duration outliers are more expensive. If one request takes much
> longer than the rest, it can hold up the entire pipeline, waiting for the one
> request to succeed to allow the checkpoint barrier to pass. In practice, most
> enterprise apps don't tend to have these outliers, but some might and should
> probably stay using the existing async solution.
> * More redundant computation of async requests. Currently, one failing
> request does not cause all the ones nearby to be re-computer. In the proposed
> approach, any single failure (failed all retries) in a checkpoint will cause
> all records in that checkpoint to be recomputed. Usually not an issue, since
> things can already get retried in normal Flink execution.
>
> Final tentative recommendation:
> Leave the existing async as an option, and implement a new stateless async.
> Stateless async will have different tradeoffs, but is arguably better for
> large enterprises with cross functional partners doing pipelining.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)