fresh-borzoni commented on code in PR #3263:
URL: https://github.com/apache/fluss/pull/3263#discussion_r3236284680
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -109,20 +109,31 @@ void tryComplete(TableBucket tableBucket) {
while (pendings != null && !pendings.isEmpty()) {
PendingFetch pendingFetch = pendings.peek();
if (pendingFetch.isCompleted()) {
- CompletedFetch completedFetch =
pendingFetch.toCompletedFetch();
- completedFetches.add(completedFetch);
pendings.poll();
- hasCompleted = true;
+ try {
+ CompletedFetch completedFetch =
pendingFetch.toCompletedFetch();
+ completedFetches.add(completedFetch);
+ hasCompleted = true;
+ } catch (Throwable t) {
+ // If toCompletedFetch() fails (e.g. the
underlying chunk
+ // future completed exceptionally), discard
this entry so
+ // the queue is not blocked. The bucket will
become fetchable
+ // again and the server can re-issue a remote
fetch.
Review Comment:
I'm not sure: what if the issue is permanent? It would be a infitite loop
with only warning. Should we have retry with backoff and then surface an error
in the scanner?
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -109,20 +109,31 @@ void tryComplete(TableBucket tableBucket) {
while (pendings != null && !pendings.isEmpty()) {
PendingFetch pendingFetch = pendings.peek();
if (pendingFetch.isCompleted()) {
- CompletedFetch completedFetch =
pendingFetch.toCompletedFetch();
- completedFetches.add(completedFetch);
pendings.poll();
- hasCompleted = true;
+ try {
+ CompletedFetch completedFetch =
pendingFetch.toCompletedFetch();
+ completedFetches.add(completedFetch);
+ hasCompleted = true;
+ } catch (Throwable t) {
+ // If toCompletedFetch() fails (e.g. the
underlying chunk
+ // future completed exceptionally), discard
this entry so
+ // the queue is not blocked. The bucket will
become fetchable
+ // again and the server can re-issue a remote
fetch.
Review Comment:
nit: I'm not sure: what if the issue is permanent? It would be a infitite
loop with only warning. Should we have retry with backoff and then surface an
error in the scanner?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]