Kaixuan-Duan commented on code in PR #3132:
URL: https://github.com/apache/fluss/pull/3132#discussion_r3224023215


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -136,31 +188,60 @@ public Iterable<LogRecordBatch> fetch(long startOffset, 
long localLogStartOffset
         }
 
         LOG.info(
-                "Found {} remote log segments for table bucket {} from offset 
{} to localLogStartOffset {}",
+                "Found {} remote log segments for table bucket {} from offset 
{} to localLogStartOffset {} "
+                        + "(prefetchNum={}, downloadThreads={})",
                 segments.size(),
                 tableBucket,
                 startOffset,
-                localLogStartOffset);
+                localLogStartOffset,
+                prefetchNum,
+                ((java.util.concurrent.ThreadPoolExecutor) 
downloadExecutor).getCorePoolSize());
 
         RemoteLogBatchIterator iterator =
                 new RemoteLogBatchIterator(segments, startOffset, 
localLogStartOffset);
         this.activeIterator = iterator;
+        // Kick off the initial prefetch window eagerly so that the very first 
advance() can
+        // consume from the ring instead of falling back to a synchronous 
download.
+        iterator.fillPrefetchWindow();
         return () -> iterator;
     }
 
     @Override
     public void close() {
-        // Close any active iterator to release file handles
-        if (activeIterator != null) {
-            activeIterator.close();
-            activeIterator = null;
-        }
-        // Remove the entire "tmp" parent directory to clean up our 
subdirectory as well
-        // as any stale recovery directories left by a previous failed 
recovery.
-        Path tmpDir = tempDir.getParent();
-        if (tmpDir != null && Files.exists(tmpDir)) {
-            LOG.info("Cleaning up remote log recovery tmp dir: {}", tmpDir);
-            deleteDirectoryQuietly(tmpDir.toFile());
+        try {
+            // Close any active iterator to release file handles
+            if (activeIterator != null) {
+                activeIterator.close();
+                activeIterator = null;
+            }
+        } finally {
+            downloadExecutor.shutdownNow();
+
+            boolean terminated = false;
+            try {
+                terminated =
+                        downloadExecutor.awaitTermination(1, 
java.util.concurrent.TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOG.warn(
+                        "Interrupted while waiting for remote log fetcher 
download executor termination for table bucket {}.",
+                        tableBucket,
+                        e);
+            }
+
+            if (terminated) {

Review Comment:
   Good catch. Removed tempDir.getParent() deletion; now close() always 
attempts best-effort cleanup of its own UUID subdirectory regardless of 
executor termination, with a warning log if files are still in use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to