[
https://issues.apache.org/jira/browse/FLINK-38746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan resolved FLINK-38746.
-----------------------------
Fix Version/s: 2.3.0
Resolution: Fixed
Merged to master(2.3.0) via: 644f46145897654b0d48f5702fb5924baac9c0ec
> `RecordsWindowBuffer.addElement()` catches `EOFException` and retries
> recursively, causing StackOverflowError
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38746
> URL: https://issues.apache.org/jira/browse/FLINK-38746
> Project: Flink
> Issue Type: Bug
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.3.0
>
>
> h2. Problem
> {{RecordsWindowBuffer.addElement()}} catches {{EOFException}} and retries
> recursively, causing *StackOverflowError* when retry keeps failing.
> Exception:
> {code:java}
> java.lang.StackOverflowError
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:82)
> at
> org.apache.flink.table.data.writer.BinaryRowWriter.writeString(BinaryRowWriter.java:27)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:98)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
> {code}
> h2. Root Cause
> The original code assumes "flush will always free enough space for retry".
> This assumption fails when unrecoverable errors occur, leading to infinite
> recursion.
> h2. Solution
> Use {{numKeys == 0}} as the termination condition:
> {code:java}
> while (true) {
> LookupInfo<WindowKey, Iterator<RowData>> lookup =
> recordsBuffer.lookup(reuseWindowKey);
> try {
> recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
> break;
> } catch (EOFException e) {
> if (recordsBuffer.getNumKeys() == 0) {
> // Buffer is empty, retry won't help (unrecoverable error)
> throw e;
> }
> // Buffer has data, flush and retry
> flush();
> checkState(recordsBuffer.getNumKeys() == 0, "The recordsBuffer should
> be empty after flushing.");
> }
> } {code}
> h2. Why This Works
> ||Scenario||Behavior||
> |Buffer full (recoverable)|numKeys>0 → flush → numKeys=0 → retry → success|
> |Unrecoverable error (1st attempt)|numKeys=0 → throw immediately|
> |Unrecoverable error (after flush retry)|numKeys>0 → flush → numKeys=0 →
> retry → fail again → numKeys=0 → throw|
> Key point: {{flush()}} clears the buffer, making {{{}numKeys == 0{}}}. If
> retry still fails after flush, entering the catch block again with {{numKeys
> == 0}} indicates the problem is not caused by a full buffer, but an
> unrecoverable error. In this case, an exception should be thrown instead of
> continuing to retry.
> h2. Benefits
> # {*}Prevents StackOverflowError{*}: The while loop executes at most twice
> (initial attempt + retry after flush), no infinite loop
> # {*}Preserves normal behavior{*}: Normal flush + retry still works when
> buffer is full
> # {*}Better diagnostics{*}: Unrecoverable errors throw a complete
> EOFException with full stack trace, making it easy to identify the root cause
> # {*}Minimal change{*}: Only checks numKeys, no additional state variables
> introduced
--
This message was sent by Atlassian Jira
(v8.20.10#820010)