fresh-borzoni commented on code in PR #3132:
URL: https://github.com/apache/fluss/pull/3132#discussion_r3220334466


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -333,12 +555,202 @@ private void advance() {
             }
         }
 
+        /**
+         * Obtain the local file for {@code segment}, ideally reusing a 
prefetched download. On
+         * success the ring head is consumed and a refill is triggered, so the 
window stays full as
+         * long as more segments remain.
+         */
+        private File fetchSegmentFile(RemoteLogSegment segment) throws 
IOException {
+            // Invariant: fillPrefetchWindow() and advance() walk the same 
pre-filtered
+            // `segments` list in strict order with identical skip rules; 
consumption is
+            // single-threaded FIFO; prefetchNum >= 1 (Math.max(1, ...) in the 
ctor). So
+            // the ring head is *always* the segment advance() is asking for.
+            int headSlot = nextConsumeIndex % prefetchNum;
+            RemoteLogSegment headSegment = prefetchSegments[headSlot];
+            assert isSameSegmentId(segment, headSegment)
+                    : "prefetch ring head "
+                            + (headSegment == null ? "null" : 
headSegment.remoteLogSegmentId())
+                            + " does not match requested "
+                            + segment.remoteLogSegmentId();
+            Future<File> headFuture = prefetchSlots[headSlot];
+            // Eagerly null the slot refs so the ring never keeps stale 
references even if the
+            // consumer throws below — also releases the RemoteLogSegment for 
GC early.
+            prefetchSlots[headSlot] = null;
+            prefetchSegments[headSlot] = null;
+            nextConsumeIndex++;
+            try {
+                return headFuture.get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(
+                        "Interrupted while waiting for remote log segment 
download: "
+                                + segment.remoteLogSegmentId(),
+                        e);
+            } catch (CancellationException e) {
+                LOG.warn(
+                        "Prefetched segment {} was cancelled, fallback to sync 
download.",
+                        segment.remoteLogSegmentId(),
+                        e);
+                return downloadSegmentWithRetry(segment);
+            } catch (ExecutionException e) {
+                LOG.warn(
+                        "Prefetched segment {} failed even after async 
retries, "
+                                + "fallback to one more synchronous retry 
round.",
+                        segment.remoteLogSegmentId(),
+                        e.getCause());
+                return downloadSegmentWithRetry(segment);
+            } finally {
+                // Window slot released — try to submit the next prefetch 
right away.
+                fillPrefetchWindow();
+            }
+        }
+
+        private boolean isSameSegmentId(RemoteLogSegment left, 
RemoteLogSegment right) {
+            return left != null
+                    && right != null
+                    && 
left.remoteLogSegmentId().equals(right.remoteLogSegmentId());
+        }
+
+        /**
+         * Submit as many download tasks as fit in the window. Non-blocking: 
if the window is full
+         * we stop and wait for the consumer to advance.
+         */
+        private void fillPrefetchWindow() {
+            if (closed) {
+                return;
+            }
+
+            while (nextPrefetchIndex < segments.size()) {
+                RemoteLogSegment segment = segments.get(nextPrefetchIndex);
+
+                // segment entirely before currentOffset — skip and advance 
index.
+                if (segment.remoteLogEndOffset() < currentOffset) {
+                    nextPrefetchIndex++;
+                    continue;
+                }
+                // segment starts at or beyond localLogStartOffset — nothing 
more to prefetch.
+                if (segment.remoteLogStartOffset() >= localLogStartOffset) {
+                    return;
+                }
+
+                // Window full — back off and wait for the consumer to advance.
+                if (prefetchWindowSize() >= prefetchNum) {
+                    return;
+                }
+
+                final RemoteLogSegment target = segment;
+                Future<File> future;
+                try {
+                    // submit(Callable) returns a FutureTask whose 
cancel(true) actually calls
+                    // Thread.interrupt() on the worker, unlike 
CompletableFuture.cancel which
+                    // only flips state. This lets stale-fallback / inject / 
second-fetch paths
+                    // really stop in-flight downloads instead of orphaning 
their files in tempDir.
+                    future =
+                            downloadExecutor.submit(
+                                    (Callable<File>) () -> 
downloadSegmentWithRetry(target));
+                } catch (Throwable submitError) {
+                    // The executor rejected the task (e.g. we're shutting 
down); stop refilling.
+                    // No slot has been claimed yet, so there is nothing to 
roll back.
+                    LOG.debug(
+                            "Failed to submit prefetch for segment {} 
(executor likely shutting down).",
+                            target.remoteLogSegmentId(),
+                            submitError);
+                    return;
+                }
+
+                int slot = nextPrefetchIndex % prefetchNum;
+                prefetchSlots[slot] = future;
+                prefetchSegments[slot] = target;
+                nextPrefetchIndex++;
+                LOG.debug(
+                        "Prefetching remote log segment {} for bucket {} 
(window size={}, free slots={}).",
+                        target.remoteLogSegmentId(),
+                        tableBucket,
+                        prefetchWindowSize(),
+                        prefetchNum - prefetchWindowSize());
+            }
+        }
+
+        /**
+         * Cancel or clean up every entry currently in the window. Used on 
close, iterator failure,
+         * and stale-ring fallback. After this runs the invariant {@code 
nextConsumeIndex ==
+         * nextPrefetchIndex} holds, so the window is empty.
+         */
+        private void drainPrefetchWindow() {
+            while (nextConsumeIndex < nextPrefetchIndex) {
+                int slot = nextConsumeIndex % prefetchNum;
+                Future<File> future = prefetchSlots[slot];
+                prefetchSlots[slot] = null;
+                prefetchSegments[slot] = null;
+                nextConsumeIndex++;
+                if (future.isDone()) {
+                    if (!cleanupCompletedFuture(future)) {
+                        return;
+                    }
+                } else {
+                    if (!future.cancel(true) && 
!cleanupCompletedFuture(future)) {
+                        return;
+                    }
+                }
+            }
+        }
+
+        /**
+         * Drain a future that is (now) known to be completed: pull its file 
out via {@link
+         * Future#get()} and delete it. {@code isDone()} guarantees {@code 
get()} does not block.
+         *
+         * @return {@code true} on a clean drain, {@code false} if this thread 
was interrupted while
+         *     calling {@code get()} (caller should bail out and let close()'s 
shutdownNow path
+         *     reclaim the remaining slots).
+         */
+        private boolean cleanupCompletedFuture(Future<File> future) {
+            try {
+                // isDone() is true so get() will not block; it returns either 
the
+                // successfully downloaded file, or throws 
Cancellation/Execution.
+                cleanupUnusedPrefetchedFile(future.get());
+            } catch (CancellationException | ExecutionException ignored) {
+                // no local file to clean up
+            } catch (InterruptedException e) {
+                // A completed future never actually blocks here, so an 
interrupt on this
+                // thread came from elsewhere. Restore the flag and bail out — 
close()'s
+                // follow-up shutdownNow()/awaitTermination path will reclaim 
whatever's
+                // left in the ring.
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }
+
+        private void cleanupUnusedPrefetchedFile(File prefetchedFile) {
+            if (prefetchedFile == null) {
+                return;
+            }
+
+            try {
+                Files.deleteIfExists(prefetchedFile.toPath());
+            } catch (IOException cleanupException) {
+                LOG.warn(
+                        "Failed to cleanup unused prefetched segment file {} 
for table bucket {}.",
+                        prefetchedFile,
+                        tableBucket,
+                        cleanupException);
+            }
+        }
+
         private void closeCurrentFileLogRecords() {

Review Comment:
   The new config bounds “downloaded but not yet consumed” segments, but 
consumed segment files are not deleted when we move to the next segment. 
closeCurrentFileLogRecords() closes FileLogRecords, but the underlying .log 
file remains in tempDir until fetcher close.
   
   Could we track the currently opened local segment file and delete it after 
closing FileLogRecords? Something like currentLocalFile, set after 
fetchSegmentFile(), then delete it in closeCurrentFileLogRecords().



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -136,31 +188,60 @@ public Iterable<LogRecordBatch> fetch(long startOffset, 
long localLogStartOffset
         }
 
         LOG.info(
-                "Found {} remote log segments for table bucket {} from offset 
{} to localLogStartOffset {}",
+                "Found {} remote log segments for table bucket {} from offset 
{} to localLogStartOffset {} "
+                        + "(prefetchNum={}, downloadThreads={})",
                 segments.size(),
                 tableBucket,
                 startOffset,
-                localLogStartOffset);
+                localLogStartOffset,
+                prefetchNum,
+                ((java.util.concurrent.ThreadPoolExecutor) 
downloadExecutor).getCorePoolSize());
 
         RemoteLogBatchIterator iterator =
                 new RemoteLogBatchIterator(segments, startOffset, 
localLogStartOffset);
         this.activeIterator = iterator;
+        // Kick off the initial prefetch window eagerly so that the very first 
advance() can
+        // consume from the ring instead of falling back to a synchronous 
download.
+        iterator.fillPrefetchWindow();
         return () -> iterator;
     }
 
     @Override
     public void close() {
-        // Close any active iterator to release file handles
-        if (activeIterator != null) {
-            activeIterator.close();
-            activeIterator = null;
-        }
-        // Remove the entire "tmp" parent directory to clean up our 
subdirectory as well
-        // as any stale recovery directories left by a previous failed 
recovery.
-        Path tmpDir = tempDir.getParent();
-        if (tmpDir != null && Files.exists(tmpDir)) {
-            LOG.info("Cleaning up remote log recovery tmp dir: {}", tmpDir);
-            deleteDirectoryQuietly(tmpDir.toFile());
+        try {
+            // Close any active iterator to release file handles
+            if (activeIterator != null) {
+                activeIterator.close();
+                activeIterator = null;
+            }
+        } finally {
+            downloadExecutor.shutdownNow();
+
+            boolean terminated = false;
+            try {
+                terminated =
+                        downloadExecutor.awaitTermination(1, 
java.util.concurrent.TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOG.warn(
+                        "Interrupted while waiting for remote log fetcher 
download executor termination for table bucket {}.",
+                        tableBucket,
+                        e);
+            }
+
+            if (terminated) {

Review Comment:
   This cleanup path seems risky in two ways.
   
   First, if awaitTermination() times out, we skip cleanup entirely, so one 
uncooperative download can leave the recovery directory behind.
   
   Second, tempDir.getParent() is the shared tmp/ directory under the log dir, 
not just this fetcher's recovery directory. That is broader than what this 
RemoteLogFetcher owns.
   
   Could we delete tempDir itself, not its parent, and attempt that best-effort 
regardless of whether the executor terminated? If deletion fails because a file 
is still open, logging that is fine, but I don't think we should skip the 
cleanup attempt.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -333,12 +555,202 @@ private void advance() {
             }
         }
 
+        /**
+         * Obtain the local file for {@code segment}, ideally reusing a 
prefetched download. On
+         * success the ring head is consumed and a refill is triggered, so the 
window stays full as
+         * long as more segments remain.
+         */
+        private File fetchSegmentFile(RemoteLogSegment segment) throws 
IOException {
+            // Invariant: fillPrefetchWindow() and advance() walk the same 
pre-filtered
+            // `segments` list in strict order with identical skip rules; 
consumption is
+            // single-threaded FIFO; prefetchNum >= 1 (Math.max(1, ...) in the 
ctor). So
+            // the ring head is *always* the segment advance() is asking for.
+            int headSlot = nextConsumeIndex % prefetchNum;
+            RemoteLogSegment headSegment = prefetchSegments[headSlot];
+            assert isSameSegmentId(segment, headSegment)

Review Comment:
   This invariant is important enough that I don't think assert is the right 
guard. Assertions are normally disabled in production.
   Could we make this an explicit runtime check, drain the prefetch window, and 
throw IOException with the requested/head segment ids?



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -189,16 +294,80 @@ private File downloadSegment(RemoteLogSegment segment) 
throws IOException {
                 segment.remoteLogEndOffset(),
                 localFile);
 
+        boolean success = false;
         try (InputStream inputStream = remoteLogStorage.fetchLogData(segment);
                 OutputStream outputStream = 
Files.newOutputStream(localFile.toPath())) {
             IOUtils.copyBytes(inputStream, outputStream, false);
+            success = true;
         } catch (RemoteStorageException e) {
             throw new IOException(
                     "Failed to download remote log segment: " + 
segment.remoteLogSegmentId(), e);
+        } finally {
+            // Most remote/file InputStreams don't honor Thread.interrupt() 
mid-read: the
+            // call simply returns a normally-completed copy and we'd fall 
through with
+            // success=true while the worker has been interrupted by 
close()/cancel(true).
+            // Treat "interrupt observed during the copy" as a failure for 
cleanup purposes
+            // so we don't leave a stale segment file behind in tempDir.
+            if (!success || Thread.currentThread().isInterrupted()) {

Review Comment:
   In downloadSegment(), if the copy succeeds but the thread has been 
interrupted, the finally block deletes localFile but the method can still 
return that File object.
   
   Usually Future.cancel(true) masks this because the Future is cancelled, but 
interrupt races can still make this path observable. Could we throw after 
deleting on the interrupted path rather than returning a file that may no 
longer exist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to