[
https://issues.apache.org/jira/browse/FLINK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800809#comment-17800809
]
Martijn Visser commented on FLINK-33933:
----------------------------------------
[~KarlManong] I believe that the asyncLookup() method in LookupFunctionA is
throwing an exception, specifically an IOException, which is then being wrapped
in a SerializedThrowable by Flink's serialization mechanism. However, the
SerializedThrowable class is also serializable, so when Flink tries to
serialize it, it ends up causing a StackOverflowError due to infinite recursion.
To fix this issue, you should ensure that the asyncLookup() method in
LookupFunctionA does not throw an exception. If an exception occurs during the
lookup, you should return a CompletableFuture that is completed exceptionally
with the exception.
> SerializedThrowable will be java.lang.StackOverflowError when
> AsyncLookupFunction throw an exception
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-33933
> URL: https://issues.apache.org/jira/browse/FLINK-33933
> Project: Flink
> Issue Type: Bug
> Environment: tested from 1.16 to 1.18 , the same behavior
> Reporter: KarlManong
> Priority: Minor
>
> Here is a simple example
> {code:java}
> // example
> public class TableA implements LookupTableSource {
> @Nullable
> private final LookupCache cache;
> public TableA(@Nullable LookupCache cache) {
> this.cache = cache;
> }
> @Override
> public LookupRuntimeProvider
> getLookupRuntimeProvider(LookupContext context) {
> FunctionA lookupFunction = new FunctionA();
> if (cache != null) {
> return PartialCachingAsyncLookupProvider.of(lookupFunction, cache);
> } else {
> return AsyncLookupFunctionProvider.of(lookupFunction);
> }
> }
> @Override
> public DynamicTableSource copy() {
> return new TableA(cache);
> }
> @Override
> public String asSummaryString() {
> return "Async Table";
> }
> }
> public class LookupFunctionA extends AsyncLookupFunction {
> @Override
> public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
> CompletableFuture<Collection<RowData>> future = new
> CompletableFuture<>();
> future.completeExceptionally(new IOException("request failed"));
> return future;
> }
> }
> {code}
> When using TableA, StackOverflowError occurs
>
> {code:java}
> // code placeholder
> java.lang.StackOverflowError
> at java.base/java.lang.Exception.<init>(Exception.java:66)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:66)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)