fresh-borzoni commented on code in PR #3263:
URL: https://github.com/apache/fluss/pull/3263#discussion_r3236284680


##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -109,20 +109,31 @@ void tryComplete(TableBucket tableBucket) {
                     while (pendings != null && !pendings.isEmpty()) {
                         PendingFetch pendingFetch = pendings.peek();
                         if (pendingFetch.isCompleted()) {
-                            CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
-                            completedFetches.add(completedFetch);
                             pendings.poll();
-                            hasCompleted = true;
+                            try {
+                                CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
+                                completedFetches.add(completedFetch);
+                                hasCompleted = true;
+                            } catch (Throwable t) {
+                                // If toCompletedFetch() fails (e.g. the 
underlying chunk
+                                // future completed exceptionally), discard 
this entry so
+                                // the queue is not blocked. The bucket will 
become fetchable
+                                // again and the server can re-issue a remote 
fetch.

Review Comment:
   I'm not sure: what if the issue is permanent? It would be a infitite loop 
with only warning. Should we have retry with backoff and then surface an error 
in the scanner?



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -109,20 +109,31 @@ void tryComplete(TableBucket tableBucket) {
                     while (pendings != null && !pendings.isEmpty()) {
                         PendingFetch pendingFetch = pendings.peek();
                         if (pendingFetch.isCompleted()) {
-                            CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
-                            completedFetches.add(completedFetch);
                             pendings.poll();
-                            hasCompleted = true;
+                            try {
+                                CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
+                                completedFetches.add(completedFetch);
+                                hasCompleted = true;
+                            } catch (Throwable t) {
+                                // If toCompletedFetch() fails (e.g. the 
underlying chunk
+                                // future completed exceptionally), discard 
this entry so
+                                // the queue is not blocked. The bucket will 
become fetchable
+                                // again and the server can re-issue a remote 
fetch.

Review Comment:
   nit: I'm not sure: what if the issue is permanent? It would be a infitite 
loop with only warning. Should we have retry with backoff and then surface an 
error in the scanner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to