[ 
https://issues.apache.org/jira/browse/FLINK-38746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-38746:
----------------------------
    Description: 
h2. Problem

{{RecordsWindowBuffer.addElement()}} catches {{EOFException}} and retries 
recursively, causing *StackOverflowError* when retry keeps failing.

Exception:
{code:java}
java.lang.StackOverflowError
        at 
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:82)
        at 
org.apache.flink.table.data.writer.BinaryRowWriter.writeString(BinaryRowWriter.java:27)
        at 
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:98)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
 {code}
h2. Root Cause

The original code assumes "flush will always free enough space for retry". This 
assumption fails when unrecoverable errors occur, leading to infinite recursion.
h2. Solution

Use {{numKeys == 0}} as the termination condition:

 
{code:java}
while (true) {
    LookupInfo<WindowKey, Iterator<RowData>> lookup = 
recordsBuffer.lookup(reuseWindowKey);
    try {
        recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
        break;
    } catch (EOFException e) {
        if (recordsBuffer.getNumKeys() == 0) {
            // Buffer is empty, retry won't help (unrecoverable error)
            throw e;
        }
        // Buffer has data, flush and retry
        flush();
        // Verify flush cleared the buffer
        checkState(recordsBuffer.getNumKeys() == 0, "The recordsBuffer should 
be empty after flushing.");
    }
} {code}
 
h2. Why This Works
||Scenario||Behavior||
|Buffer full (recoverable)|numKeys>0 → flush → numKeys=0 → retry → success|
|Unrecoverable error (1st attempt)|numKeys=0 → throw immediately|
|Unrecoverable error (after flush retry)|numKeys>0 → flush → numKeys=0 → retry 
→ fail again → numKeys=0 → throw|

Key point: {{flush()}} guarantees {{{}numKeys == 0{}}}, so a retry failure will 
always meet the termination condition.
h2. Key Insight

{{flush()}} clears the buffer, making {{{}numKeys == 0{}}}. If retry still 
fails after flush, entering the catch block again with {{numKeys == 0}} 
indicates the problem is not caused by a full buffer, but an unrecoverable 
error. In this case, an exception should be thrown instead of continuing to 
retry.
h2. Benefits
 # {*}Prevents StackOverflowError{*}: The while loop executes at most twice 
(initial attempt + retry after flush), no infinite loop
 # {*}Preserves normal behavior{*}: Normal flush + retry still works when 
buffer is full
 # {*}Better diagnostics{*}: Unrecoverable errors throw a complete EOFException 
with full stack trace, making it easy to identify the root cause
 # {*}Minimal change{*}: Only checks numKeys, no additional state variables 
introduced

  was:
Exception:
{code:java}
java.lang.StackOverflowError
        at 
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:82)
        at 
org.apache.flink.table.data.writer.BinaryRowWriter.writeString(BinaryRowWriter.java:27)
        at 
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:98)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
        at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
 {code}


> `RecordsWindowBuffer.addElement()` catches `EOFException` and retries 
> recursively, causing StackOverflowError
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38746
>                 URL: https://issues.apache.org/jira/browse/FLINK-38746
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>
> h2. Problem
> {{RecordsWindowBuffer.addElement()}} catches {{EOFException}} and retries 
> recursively, causing *StackOverflowError* when retry keeps failing.
> Exception:
> {code:java}
> java.lang.StackOverflowError
>       at 
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:82)
>       at 
> org.apache.flink.table.data.writer.BinaryRowWriter.writeString(BinaryRowWriter.java:27)
>       at 
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:98)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.addElement(RecordsWindowBuffer.java:103)
>  {code}
> h2. Root Cause
> The original code assumes "flush will always free enough space for retry". 
> This assumption fails when unrecoverable errors occur, leading to infinite 
> recursion.
> h2. Solution
> Use {{numKeys == 0}} as the termination condition:
>  
> {code:java}
> while (true) {
>     LookupInfo<WindowKey, Iterator<RowData>> lookup = 
> recordsBuffer.lookup(reuseWindowKey);
>     try {
>         recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
>         break;
>     } catch (EOFException e) {
>         if (recordsBuffer.getNumKeys() == 0) {
>             // Buffer is empty, retry won't help (unrecoverable error)
>             throw e;
>         }
>         // Buffer has data, flush and retry
>         flush();
>         // Verify flush cleared the buffer
>         checkState(recordsBuffer.getNumKeys() == 0, "The recordsBuffer should 
> be empty after flushing.");
>     }
> } {code}
>  
> h2. Why This Works
> ||Scenario||Behavior||
> |Buffer full (recoverable)|numKeys>0 → flush → numKeys=0 → retry → success|
> |Unrecoverable error (1st attempt)|numKeys=0 → throw immediately|
> |Unrecoverable error (after flush retry)|numKeys>0 → flush → numKeys=0 → 
> retry → fail again → numKeys=0 → throw|
> Key point: {{flush()}} guarantees {{{}numKeys == 0{}}}, so a retry failure 
> will always meet the termination condition.
> h2. Key Insight
> {{flush()}} clears the buffer, making {{{}numKeys == 0{}}}. If retry still 
> fails after flush, entering the catch block again with {{numKeys == 0}} 
> indicates the problem is not caused by a full buffer, but an unrecoverable 
> error. In this case, an exception should be thrown instead of continuing to 
> retry.
> h2. Benefits
>  # {*}Prevents StackOverflowError{*}: The while loop executes at most twice 
> (initial attempt + retry after flush), no infinite loop
>  # {*}Preserves normal behavior{*}: Normal flush + retry still works when 
> buffer is full
>  # {*}Better diagnostics{*}: Unrecoverable errors throw a complete 
> EOFException with full stack trace, making it easy to identify the root cause
>  # {*}Minimal change{*}: Only checks numKeys, no additional state variables 
> introduced



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to