[ 
https://issues.apache.org/jira/browse/FLINK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800809#comment-17800809
 ] 

Martijn Visser commented on FLINK-33933:
----------------------------------------

[~KarlManong] I believe that the asyncLookup() method in LookupFunctionA is 
throwing an exception, specifically an IOException, which is then being wrapped 
in a SerializedThrowable by Flink's serialization mechanism. However, the 
SerializedThrowable class is also serializable, so when Flink tries to 
serialize it, it ends up causing a StackOverflowError due to infinite recursion.

To fix this issue, you should ensure that the asyncLookup() method in 
LookupFunctionA does not throw an exception. If an exception occurs during the 
lookup, you should return a CompletableFuture that is completed exceptionally 
with the exception. 

> SerializedThrowable will be java.lang.StackOverflowError when 
> AsyncLookupFunction throw an exception
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33933
>                 URL: https://issues.apache.org/jira/browse/FLINK-33933
>             Project: Flink
>          Issue Type: Bug
>         Environment: tested from 1.16 to 1.18 , the same behavior 
>            Reporter: KarlManong
>            Priority: Minor
>
> Here is a simple example
> {code:java}
> // example
> public class TableA implements LookupTableSource {
> @Nullable
> private final LookupCache cache;
> public TableA(@Nullable LookupCache cache) {
> this.cache = cache;
> }
> @Override
> public LookupRuntimeProvider
> getLookupRuntimeProvider(LookupContext context) {
> FunctionA lookupFunction = new FunctionA();
> if (cache != null) {
> return PartialCachingAsyncLookupProvider.of(lookupFunction, cache);
> } else {
> return AsyncLookupFunctionProvider.of(lookupFunction);
> }
> }
> @Override
> public DynamicTableSource copy() {
> return new TableA(cache);
> }
> @Override
> public String asSummaryString() {
> return "Async Table";
> }
> }
> public class LookupFunctionA extends AsyncLookupFunction {
> @Override
> public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
> CompletableFuture<Collection<RowData>> future = new
> CompletableFuture<>();
> future.completeExceptionally(new IOException("request failed"));
> return future;
> }
> }
> {code}
> When using TableA, StackOverflowError occurs
>  
> {code:java}
> // code placeholder
> java.lang.StackOverflowError
>     at java.base/java.lang.Exception.<init>(Exception.java:66)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:66)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>     at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>     at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to