[
https://issues.apache.org/jira/browse/FLINK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801073#comment-17801073
]
KarlManong edited comment on FLINK-33933 at 12/29/23 1:06 AM:
--------------------------------------------------------------
[~martijnvisser] yes, when exception occurs, We'll got a
completedExceptionally future,
{code:java}
@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
try {
// Perform the lookup and create a collection of RowData objects
Collection<RowData> result = new ArrayList<>();
// TODO: Add your lookup logic here to populate the result collection
// Return the result as a completed future
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
// If an exception occurs during the lookup, complete the future
exceptionally with the exception
/*
* code run to here
*/
CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
} {code}
then Flink fatal error with a StackOverflowError
was (Author: karlmanong):
[~martijnvisser] yes, when exception occurs, We'll got a
completedExceptionally future,
{code:java}
@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
try {
// Perform the lookup and create a collection of RowData objects
Collection<RowData> result = new ArrayList<>();
// TODO: Add your lookup logic here to populate the result collection
// Return the result as a completed future
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
// If an exception occurs during the lookup, complete the future
exceptionally with the exception
// code run to here
CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
} {code}
then Flink fatal error with a StackOverflowError
> SerializedThrowable will be java.lang.StackOverflowError when
> AsyncLookupFunction throw an exception
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-33933
> URL: https://issues.apache.org/jira/browse/FLINK-33933
> Project: Flink
> Issue Type: Bug
> Environment: tested from 1.16 to 1.18 , the same behavior
> Reporter: KarlManong
> Priority: Minor
>
> Here is a simple example
> {code:java}
> // example
> public class TableA implements LookupTableSource {
> @Nullable
> private final LookupCache cache;
> public TableA(@Nullable LookupCache cache) {
> this.cache = cache;
> }
> @Override
> public LookupRuntimeProvider
> getLookupRuntimeProvider(LookupContext context) {
> FunctionA lookupFunction = new FunctionA();
> if (cache != null) {
> return PartialCachingAsyncLookupProvider.of(lookupFunction, cache);
> } else {
> return AsyncLookupFunctionProvider.of(lookupFunction);
> }
> }
> @Override
> public DynamicTableSource copy() {
> return new TableA(cache);
> }
> @Override
> public String asSummaryString() {
> return "Async Table";
> }
> }
> public class LookupFunctionA extends AsyncLookupFunction {
> @Override
> public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
> CompletableFuture<Collection<RowData>> future = new
> CompletableFuture<>();
> future.completeExceptionally(new IOException("request failed"));
> return future;
> }
> }
> {code}
> When using TableA, StackOverflowError occurs
>
> {code:java}
> // code placeholder
> java.lang.StackOverflowError
> at java.base/java.lang.Exception.<init>(Exception.java:66)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:66)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)