Kaixuan-Duan commented on code in PR #3132:
URL: https://github.com/apache/fluss/pull/3132#discussion_r3224017718


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -333,12 +555,202 @@ private void advance() {
             }
         }
 
+        /**
+         * Obtain the local file for {@code segment}, ideally reusing a 
prefetched download. On
+         * success the ring head is consumed and a refill is triggered, so the 
window stays full as
+         * long as more segments remain.
+         */
+        private File fetchSegmentFile(RemoteLogSegment segment) throws 
IOException {
+            // Invariant: fillPrefetchWindow() and advance() walk the same 
pre-filtered
+            // `segments` list in strict order with identical skip rules; 
consumption is
+            // single-threaded FIFO; prefetchNum >= 1 (Math.max(1, ...) in the 
ctor). So
+            // the ring head is *always* the segment advance() is asking for.
+            int headSlot = nextConsumeIndex % prefetchNum;
+            RemoteLogSegment headSegment = prefetchSegments[headSlot];
+            assert isSameSegmentId(segment, headSegment)
+                    : "prefetch ring head "
+                            + (headSegment == null ? "null" : 
headSegment.remoteLogSegmentId())
+                            + " does not match requested "
+                            + segment.remoteLogSegmentId();
+            Future<File> headFuture = prefetchSlots[headSlot];
+            // Eagerly null the slot refs so the ring never keeps stale 
references even if the
+            // consumer throws below — also releases the RemoteLogSegment for 
GC early.
+            prefetchSlots[headSlot] = null;
+            prefetchSegments[headSlot] = null;
+            nextConsumeIndex++;
+            try {
+                return headFuture.get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(
+                        "Interrupted while waiting for remote log segment 
download: "
+                                + segment.remoteLogSegmentId(),
+                        e);
+            } catch (CancellationException e) {
+                LOG.warn(
+                        "Prefetched segment {} was cancelled, fallback to sync 
download.",
+                        segment.remoteLogSegmentId(),
+                        e);
+                return downloadSegmentWithRetry(segment);
+            } catch (ExecutionException e) {
+                LOG.warn(
+                        "Prefetched segment {} failed even after async 
retries, "
+                                + "fallback to one more synchronous retry 
round.",
+                        segment.remoteLogSegmentId(),
+                        e.getCause());
+                return downloadSegmentWithRetry(segment);
+            } finally {
+                // Window slot released — try to submit the next prefetch 
right away.
+                fillPrefetchWindow();
+            }
+        }
+
+        private boolean isSameSegmentId(RemoteLogSegment left, 
RemoteLogSegment right) {
+            return left != null
+                    && right != null
+                    && 
left.remoteLogSegmentId().equals(right.remoteLogSegmentId());
+        }
+
+        /**
+         * Submit as many download tasks as fit in the window. Non-blocking: 
if the window is full
+         * we stop and wait for the consumer to advance.
+         */
+        private void fillPrefetchWindow() {
+            if (closed) {
+                return;
+            }
+
+            while (nextPrefetchIndex < segments.size()) {
+                RemoteLogSegment segment = segments.get(nextPrefetchIndex);
+
+                // segment entirely before currentOffset — skip and advance 
index.
+                if (segment.remoteLogEndOffset() < currentOffset) {
+                    nextPrefetchIndex++;
+                    continue;
+                }
+                // segment starts at or beyond localLogStartOffset — nothing 
more to prefetch.
+                if (segment.remoteLogStartOffset() >= localLogStartOffset) {
+                    return;
+                }
+
+                // Window full — back off and wait for the consumer to advance.
+                if (prefetchWindowSize() >= prefetchNum) {
+                    return;
+                }
+
+                final RemoteLogSegment target = segment;
+                Future<File> future;
+                try {
+                    // submit(Callable) returns a FutureTask whose 
cancel(true) actually calls
+                    // Thread.interrupt() on the worker, unlike 
CompletableFuture.cancel which
+                    // only flips state. This lets stale-fallback / inject / 
second-fetch paths
+                    // really stop in-flight downloads instead of orphaning 
their files in tempDir.
+                    future =
+                            downloadExecutor.submit(
+                                    (Callable<File>) () -> 
downloadSegmentWithRetry(target));
+                } catch (Throwable submitError) {
+                    // The executor rejected the task (e.g. we're shutting 
down); stop refilling.
+                    // No slot has been claimed yet, so there is nothing to 
roll back.
+                    LOG.debug(
+                            "Failed to submit prefetch for segment {} 
(executor likely shutting down).",
+                            target.remoteLogSegmentId(),
+                            submitError);
+                    return;
+                }
+
+                int slot = nextPrefetchIndex % prefetchNum;
+                prefetchSlots[slot] = future;
+                prefetchSegments[slot] = target;
+                nextPrefetchIndex++;
+                LOG.debug(
+                        "Prefetching remote log segment {} for bucket {} 
(window size={}, free slots={}).",
+                        target.remoteLogSegmentId(),
+                        tableBucket,
+                        prefetchWindowSize(),
+                        prefetchNum - prefetchWindowSize());
+            }
+        }
+
+        /**
+         * Cancel or clean up every entry currently in the window. Used on 
close, iterator failure,
+         * and stale-ring fallback. After this runs the invariant {@code 
nextConsumeIndex ==
+         * nextPrefetchIndex} holds, so the window is empty.
+         */
+        private void drainPrefetchWindow() {
+            while (nextConsumeIndex < nextPrefetchIndex) {
+                int slot = nextConsumeIndex % prefetchNum;
+                Future<File> future = prefetchSlots[slot];
+                prefetchSlots[slot] = null;
+                prefetchSegments[slot] = null;
+                nextConsumeIndex++;
+                if (future.isDone()) {
+                    if (!cleanupCompletedFuture(future)) {
+                        return;
+                    }
+                } else {
+                    if (!future.cancel(true) && 
!cleanupCompletedFuture(future)) {
+                        return;
+                    }
+                }
+            }
+        }
+
+        /**
+         * Drain a future that is (now) known to be completed: pull its file 
out via {@link
+         * Future#get()} and delete it. {@code isDone()} guarantees {@code 
get()} does not block.
+         *
+         * @return {@code true} on a clean drain, {@code false} if this thread 
was interrupted while
+         *     calling {@code get()} (caller should bail out and let close()'s 
shutdownNow path
+         *     reclaim the remaining slots).
+         */
+        private boolean cleanupCompletedFuture(Future<File> future) {
+            try {
+                // isDone() is true so get() will not block; it returns either 
the
+                // successfully downloaded file, or throws 
Cancellation/Execution.
+                cleanupUnusedPrefetchedFile(future.get());
+            } catch (CancellationException | ExecutionException ignored) {
+                // no local file to clean up
+            } catch (InterruptedException e) {
+                // A completed future never actually blocks here, so an 
interrupt on this
+                // thread came from elsewhere. Restore the flag and bail out — 
close()'s
+                // follow-up shutdownNow()/awaitTermination path will reclaim 
whatever's
+                // left in the ring.
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }
+
+        private void cleanupUnusedPrefetchedFile(File prefetchedFile) {
+            if (prefetchedFile == null) {
+                return;
+            }
+
+            try {
+                Files.deleteIfExists(prefetchedFile.toPath());
+            } catch (IOException cleanupException) {
+                LOG.warn(
+                        "Failed to cleanup unused prefetched segment file {} 
for table bucket {}.",
+                        prefetchedFile,
+                        tableBucket,
+                        cleanupException);
+            }
+        }
+
         private void closeCurrentFileLogRecords() {

Review Comment:
   Thanks for the suggestion. Added currentLocalFile field to track the 
currently opened segment file; closeCurrentFileLogRecords() now deletes it 
after closing FileLogRecords.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to