[
https://issues.apache.org/jira/browse/FLINK-38746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-38746:
----------------------------
Description:
h2. Problem
{{RecordsWindowBuffer.addElement()}} catches {{EOFException}} and retries
recursively, causing *StackOverflowError* when retry keeps failing.
Exception:
{code:java}
java.lang.StackOverflowError
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:82)
at
org.apache.flink.table.data.writer.BinaryRowWriter.writeString(BinaryRowWriter.java:27)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:98)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
{code}
h2. Root Cause
The original code assumes "flush will always free enough space for retry". This
assumption fails when unrecoverable errors occur, leading to infinite recursion.
h2. Solution
Use {{numKeys == 0}} as the termination condition:
{code:java}
while (true) {
LookupInfo<WindowKey, Iterator<RowData>> lookup =
recordsBuffer.lookup(reuseWindowKey);
try {
recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
break;
} catch (EOFException e) {
if (recordsBuffer.getNumKeys() == 0) {
// Buffer is empty, retry won't help (unrecoverable error)
throw e;
}
// Buffer has data, flush and retry
flush();
// Verify flush cleared the buffer
checkState(recordsBuffer.getNumKeys() == 0, "The recordsBuffer should
be empty after flushing.");
}
} {code}
h2. Why This Works
||Scenario||Behavior||
|Buffer full (recoverable)|numKeys>0 → flush → numKeys=0 → retry → success|
|Unrecoverable error (1st attempt)|numKeys=0 → throw immediately|
|Unrecoverable error (after flush retry)|numKeys>0 → flush → numKeys=0 → retry
→ fail again → numKeys=0 → throw|
Key point: {{flush()}} guarantees {{{}numKeys == 0{}}}, so a retry failure will
always meet the termination condition.
h2. Key Insight
{{flush()}} clears the buffer, making {{{}numKeys == 0{}}}. If retry still
fails after flush, entering the catch block again with {{numKeys == 0}}
indicates the problem is not caused by a full buffer, but an unrecoverable
error. In this case, an exception should be thrown instead of continuing to
retry.
h2. Benefits
# {*}Prevents StackOverflowError{*}: The while loop executes at most twice
(initial attempt + retry after flush), no infinite loop
# {*}Preserves normal behavior{*}: Normal flush + retry still works when
buffer is full
# {*}Better diagnostics{*}: Unrecoverable errors throw a complete EOFException
with full stack trace, making it easy to identify the root cause
# {*}Minimal change{*}: Only checks numKeys, no additional state variables
introduced
was:
Exception:
{code:java}
java.lang.StackOverflowError
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:82)
at
org.apache.flink.table.data.writer.BinaryRowWriter.writeString(BinaryRowWriter.java:27)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:98)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
at
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
{code}
> `RecordsWindowBuffer.addElement()` catches `EOFException` and retries
> recursively, causing StackOverflowError
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38746
> URL: https://issues.apache.org/jira/browse/FLINK-38746
> Project: Flink
> Issue Type: Bug
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
>
> h2. Problem
> {{RecordsWindowBuffer.addElement()}} catches {{EOFException}} and retries
> recursively, causing *StackOverflowError* when retry keeps failing.
> Exception:
> {code:java}
> java.lang.StackOverflowError
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:82)
> at
> org.apache.flink.table.data.writer.BinaryRowWriter.writeString(BinaryRowWriter.java:27)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:98)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
> at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
> {code}
> h2. Root Cause
> The original code assumes "flush will always free enough space for retry".
> This assumption fails when unrecoverable errors occur, leading to infinite
> recursion.
> h2. Solution
> Use {{numKeys == 0}} as the termination condition:
>
> {code:java}
> while (true) {
> LookupInfo<WindowKey, Iterator<RowData>> lookup =
> recordsBuffer.lookup(reuseWindowKey);
> try {
> recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
> break;
> } catch (EOFException e) {
> if (recordsBuffer.getNumKeys() == 0) {
> // Buffer is empty, retry won't help (unrecoverable error)
> throw e;
> }
> // Buffer has data, flush and retry
> flush();
> // Verify flush cleared the buffer
> checkState(recordsBuffer.getNumKeys() == 0, "The recordsBuffer should
> be empty after flushing.");
> }
> } {code}
>
> h2. Why This Works
> ||Scenario||Behavior||
> |Buffer full (recoverable)|numKeys>0 → flush → numKeys=0 → retry → success|
> |Unrecoverable error (1st attempt)|numKeys=0 → throw immediately|
> |Unrecoverable error (after flush retry)|numKeys>0 → flush → numKeys=0 →
> retry → fail again → numKeys=0 → throw|
> Key point: {{flush()}} guarantees {{{}numKeys == 0{}}}, so a retry failure
> will always meet the termination condition.
> h2. Key Insight
> {{flush()}} clears the buffer, making {{{}numKeys == 0{}}}. If retry still
> fails after flush, entering the catch block again with {{numKeys == 0}}
> indicates the problem is not caused by a full buffer, but an unrecoverable
> error. In this case, an exception should be thrown instead of continuing to
> retry.
> h2. Benefits
> # {*}Prevents StackOverflowError{*}: The while loop executes at most twice
> (initial attempt + retry after flush), no infinite loop
> # {*}Preserves normal behavior{*}: Normal flush + retry still works when
> buffer is full
> # {*}Better diagnostics{*}: Unrecoverable errors throw a complete
> EOFException with full stack trace, making it easy to identify the root cause
> # {*}Minimal change{*}: Only checks numKeys, no additional state variables
> introduced
--
This message was sent by Atlassian Jira
(v8.20.10#820010)