fresh-borzoni commented on code in PR #3263:
URL: https://github.com/apache/fluss/pull/3263#discussion_r3236133577


##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java:
##########
@@ -338,16 +420,320 @@ private static List<RemoteLogSegment> 
buildRemoteLogSegmentList(
         return remoteLogSegmentList;
     }
 
+    /**
+     * Tests chunked download with a small chunk size, verifying that: 1. A 
large segment is split
+     * into multiple chunks. 2. Chunks can be consumed while subsequent chunks 
are still being
+     * downloaded (边读边下载). 3. Flow control (maxPrefetchChunks) pauses 
downloading when unconsumed

Review Comment:
   nit: mix of languages here



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java:
##########


Review Comment:
   A request that's been polled and partially read but is paused is in neither 
segmentsToFetch nor continuationQueue. 
   
   close() only walks those two queues, so the open FSDataInputStream and the 
remote-chunk tmp file are left behind.



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloadFuture.java:
##########
@@ -17,47 +17,69 @@
 
 package org.apache.fluss.client.table.scanner.log;
 
-import org.apache.fluss.exception.FlussRuntimeException;
-import org.apache.fluss.record.FileLogRecords;
+import org.apache.fluss.record.LogRecords;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
-/** Represents the future of a remote log download request. */
+/**
+ * Represents the future of a single chunk read from a remote log segment. 
Each chunk is delivered
+ * as a {@link LogRecords} via a {@link CompletableFuture}.
+ */
 public class RemoteLogDownloadFuture {
 
-    private final CompletableFuture<File> logFileFuture;
+    private static final Logger LOG = 
LoggerFactory.getLogger(RemoteLogDownloadFuture.class);
+
+    private final CompletableFuture<LogRecords> chunkFuture;
     private final Runnable recycleCallback;
+    private Consumer<RemoteLogDownloadFuture> nextChunkCallback;

Review Comment:
   volatile?



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java:
##########
@@ -40,187 +41,342 @@
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
-import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly;
-import static org.apache.fluss.utils.FlussPaths.LOG_FILE_SUFFIX;
 import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
 import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentFile;
 
-/** Downloader to read remote log files to local disk. */
+/**
+ * Reads remote log segment files in chunks via streaming I/O. Each chunk 
(configurable via {@code
+ * client.scanner.remote-log.chunk-size}, default 8 MB) is read from the 
remote filesystem, appended
+ * to a local temporary file, and delivered as a {@link FileLogRecords} slice. 
This reduces JVM heap
+ * pressure by leveraging OS page cache.
+ *
+ * <p>Flow control: each segment has at most {@code maxPrefetchChunks} 
unconsumed chunks. The
+ * downloader pauses when this limit is reached and resumes when chunks are 
consumed.
+ */
 @ThreadSafe
 @Internal
 public class RemoteLogDownloader implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(RemoteLogDownloader.class);
 
     private static final long POLL_TIMEOUT = 5000L;
 
-    private final Path localLogDir;
-
     /**
      * A queue to hold the remote log segment files to be fetched. The queue 
is ordered by the
      * max_timestamp of the remote log segment. So we download the remote log 
segments from the
      * older to the newer.
      */
     private final PriorityBlockingQueue<RemoteLogDownloadRequest> 
segmentsToFetch;
 
-    private final BlockingQueue<RemoteLogSegment> segmentsToRecycle;
+    /** Queue for continuation chunks of segments that are already being 
streamed. */
+    private final BlockingQueue<RemoteLogDownloadRequest> continuationQueue;
 
     private final Semaphore prefetchSemaphore;
 
     private final DownloadRemoteLogThread downloadThread;
 
-    private final RemoteFileDownloader remoteFileDownloader;
-
     private final ScannerMetricGroup scannerMetricGroup;
 
     private final long pollTimeout;
 
+    private final File tmpDir;
+
+    private final int chunkSize;
+
+    private final int maxPrefetchChunks;
+
     public RemoteLogDownloader(
-            TablePath tablePath,
-            Configuration conf,
-            RemoteFileDownloader remoteFileDownloader,
-            ScannerMetricGroup scannerMetricGroup) {
+            TablePath tablePath, Configuration conf, ScannerMetricGroup 
scannerMetricGroup) {
         // default we give a 5s long interval to avoid frequent loop
-        this(tablePath, conf, remoteFileDownloader, scannerMetricGroup, 
POLL_TIMEOUT);
+        this(tablePath, conf, scannerMetricGroup, POLL_TIMEOUT);
     }
 
     @VisibleForTesting
     RemoteLogDownloader(
             TablePath tablePath,
             Configuration conf,
-            RemoteFileDownloader remoteFileDownloader,
             ScannerMetricGroup scannerMetricGroup,
             long pollTimeout) {
         this.segmentsToFetch = new PriorityBlockingQueue<>();
-        this.segmentsToRecycle = new LinkedBlockingQueue<>();
-        this.remoteFileDownloader = remoteFileDownloader;
+        this.continuationQueue = new LinkedBlockingQueue<>();
         this.scannerMetricGroup = scannerMetricGroup;
         this.pollTimeout = pollTimeout;
         this.prefetchSemaphore =
                 new 
Semaphore(conf.getInt(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM));
-        // The local tmp dir to store the fetched log segment files,
-        // add UUID to avoid conflict between tasks.
-        this.localLogDir =
-                Paths.get(
-                        conf.get(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR),
-                        "remote-logs-" + UUID.randomUUID());
+        this.chunkSize =
+                (int) 
conf.get(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE).getBytes();
+        this.maxPrefetchChunks =
+                
conf.getInt(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS);
+        this.tmpDir = new 
File(conf.getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR));
+        if (!tmpDir.exists() && !tmpDir.mkdirs()) {
+            throw new IllegalStateException(
+                    "Failed to create temp directory for remote log chunks: " 
+ tmpDir);
+        }
         this.downloadThread = new DownloadRemoteLogThread(tablePath);
     }
 
     public void start() {
         downloadThread.start();
     }
 
-    /** Request to fetch remote log segment to local. This method is 
non-blocking. */
-    public RemoteLogDownloadFuture requestRemoteLog(FsPath logTabletDir, 
RemoteLogSegment segment) {
-        RemoteLogDownloadRequest request = new 
RemoteLogDownloadRequest(segment, logTabletDir);
+    /**
+     * Request to read a remote log segment in chunks starting from the given 
position. This method
+     * is non-blocking and returns a future for the first chunk.
+     */
+    public RemoteLogDownloadFuture requestRemoteLog(

Review Comment:
   I think we introduced a race:
   requestRemoteLog() adds the request to segmentsToFetch and returns. The 
caller then installs the next-chunk callback on the returned future. But the 
download thread is already running, so it can poll the request, read chunk 1, 
and call tryScheduleNextChunk(), which builds chunk 2's future by copying 
request.downloadFuture.getNextChunkCallback(), still null at that moment. Chunk 
2 is read and completed, but nothing is listening,so the bucket silently stops 
at chunk 1.
   Probably we can install the callback before publishing the request



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1271,6 +1271,24 @@ public class ConfigOptions {
                             "The number of remote log segments to keep in 
local temp file for LogScanner, "
                                     + "which download from remote storage. The 
default setting is 4.");
 
+    public static final ConfigOption<MemorySize> 
CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE =
+            key("client.scanner.remote-log.chunk-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("8mb"))
+                    .withDescription(
+                            "The size of each chunk when downloading remote 
log segments. "
+                                    + "A larger chunk size reduces the number 
of remote I/O requests but "
+                                    + "increases memory usage per chunk read. 
The default setting is 8MB.");
+
+    public static final ConfigOption<Integer> 
CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS =

Review Comment:
   I wonder what will happen if it's 0?



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java:
##########
@@ -109,20 +109,31 @@ void tryComplete(TableBucket tableBucket) {
                     while (pendings != null && !pendings.isEmpty()) {
                         PendingFetch pendingFetch = pendings.peek();
                         if (pendingFetch.isCompleted()) {
-                            CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
-                            completedFetches.add(completedFetch);
                             pendings.poll();
-                            hasCompleted = true;
+                            try {
+                                CompletedFetch completedFetch = 
pendingFetch.toCompletedFetch();
+                                completedFetches.add(completedFetch);
+                                hasCompleted = true;
+                            } catch (Throwable t) {
+                                // If toCompletedFetch() fails (e.g. the 
underlying chunk
+                                // future completed exceptionally), discard 
this entry so
+                                // the queue is not blocked. The bucket will 
become fetchable
+                                // again and the server can re-issue a remote 
fetch.

Review Comment:
   I'm not sure: what if the issue is permanent? It would be a hot loop with 
only warning. Should we have retry with backoff and then surface an error in 
the scanner?



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java:
##########
@@ -40,187 +41,342 @@
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
-import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly;
-import static org.apache.fluss.utils.FlussPaths.LOG_FILE_SUFFIX;
 import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
 import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentFile;
 
-/** Downloader to read remote log files to local disk. */
+/**
+ * Reads remote log segment files in chunks via streaming I/O. Each chunk 
(configurable via {@code
+ * client.scanner.remote-log.chunk-size}, default 8 MB) is read from the 
remote filesystem, appended
+ * to a local temporary file, and delivered as a {@link FileLogRecords} slice. 
This reduces JVM heap
+ * pressure by leveraging OS page cache.
+ *
+ * <p>Flow control: each segment has at most {@code maxPrefetchChunks} 
unconsumed chunks. The
+ * downloader pauses when this limit is reached and resumes when chunks are 
consumed.
+ */
 @ThreadSafe
 @Internal
 public class RemoteLogDownloader implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(RemoteLogDownloader.class);
 
     private static final long POLL_TIMEOUT = 5000L;
 
-    private final Path localLogDir;
-
     /**
      * A queue to hold the remote log segment files to be fetched. The queue 
is ordered by the
      * max_timestamp of the remote log segment. So we download the remote log 
segments from the
      * older to the newer.
      */
     private final PriorityBlockingQueue<RemoteLogDownloadRequest> 
segmentsToFetch;
 
-    private final BlockingQueue<RemoteLogSegment> segmentsToRecycle;
+    /** Queue for continuation chunks of segments that are already being 
streamed. */
+    private final BlockingQueue<RemoteLogDownloadRequest> continuationQueue;
 
     private final Semaphore prefetchSemaphore;
 
     private final DownloadRemoteLogThread downloadThread;
 
-    private final RemoteFileDownloader remoteFileDownloader;
-
     private final ScannerMetricGroup scannerMetricGroup;
 
     private final long pollTimeout;
 
+    private final File tmpDir;
+
+    private final int chunkSize;
+
+    private final int maxPrefetchChunks;
+
     public RemoteLogDownloader(
-            TablePath tablePath,
-            Configuration conf,
-            RemoteFileDownloader remoteFileDownloader,
-            ScannerMetricGroup scannerMetricGroup) {
+            TablePath tablePath, Configuration conf, ScannerMetricGroup 
scannerMetricGroup) {
         // default we give a 5s long interval to avoid frequent loop
-        this(tablePath, conf, remoteFileDownloader, scannerMetricGroup, 
POLL_TIMEOUT);
+        this(tablePath, conf, scannerMetricGroup, POLL_TIMEOUT);
     }
 
     @VisibleForTesting
     RemoteLogDownloader(
             TablePath tablePath,
             Configuration conf,
-            RemoteFileDownloader remoteFileDownloader,
             ScannerMetricGroup scannerMetricGroup,
             long pollTimeout) {
         this.segmentsToFetch = new PriorityBlockingQueue<>();
-        this.segmentsToRecycle = new LinkedBlockingQueue<>();
-        this.remoteFileDownloader = remoteFileDownloader;
+        this.continuationQueue = new LinkedBlockingQueue<>();
         this.scannerMetricGroup = scannerMetricGroup;
         this.pollTimeout = pollTimeout;
         this.prefetchSemaphore =
                 new 
Semaphore(conf.getInt(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM));
-        // The local tmp dir to store the fetched log segment files,
-        // add UUID to avoid conflict between tasks.
-        this.localLogDir =
-                Paths.get(
-                        conf.get(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR),
-                        "remote-logs-" + UUID.randomUUID());
+        this.chunkSize =
+                (int) 
conf.get(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_CHUNK_SIZE).getBytes();
+        this.maxPrefetchChunks =
+                
conf.getInt(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS);
+        this.tmpDir = new 
File(conf.getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR));
+        if (!tmpDir.exists() && !tmpDir.mkdirs()) {
+            throw new IllegalStateException(
+                    "Failed to create temp directory for remote log chunks: " 
+ tmpDir);
+        }
         this.downloadThread = new DownloadRemoteLogThread(tablePath);
     }
 
     public void start() {
         downloadThread.start();
     }
 
-    /** Request to fetch remote log segment to local. This method is 
non-blocking. */
-    public RemoteLogDownloadFuture requestRemoteLog(FsPath logTabletDir, 
RemoteLogSegment segment) {
-        RemoteLogDownloadRequest request = new 
RemoteLogDownloadRequest(segment, logTabletDir);
+    /**
+     * Request to read a remote log segment in chunks starting from the given 
position. This method
+     * is non-blocking and returns a future for the first chunk.
+     */
+    public RemoteLogDownloadFuture requestRemoteLog(
+            FsPath logTabletDir, RemoteLogSegment segment, int startPosition) {
+        CompletableFuture<LogRecords> chunkFuture = new CompletableFuture<>();
+        RemoteLogDownloadRequest request =
+                new RemoteLogDownloadRequest(segment, logTabletDir, 
startPosition, chunkFuture);
+        RemoteLogDownloadFuture downloadFuture =
+                new RemoteLogDownloadFuture(chunkFuture, () -> 
onChunkConsumed(request));
+        // Assign downloadFuture before publishing the request to 
segmentsToFetch, so that the
+        // download thread never sees a null downloadFuture when reading 
request.downloadFuture
+        // inside createAndQueueNextChunk.
+        request.downloadFuture = downloadFuture;
         segmentsToFetch.add(request);
-        return new RemoteLogDownloadFuture(request.future, () -> 
recycleRemoteLog(segment));
+        return downloadFuture;
+    }
+
+    /**
+     * Called when a chunk has been consumed (drained). Increments the 
consumed counter and attempts
+     * to schedule the next chunk download or cleanup.
+     */
+    private void onChunkConsumed(RemoteLogDownloadRequest request) {
+        request.chunksConsumed.incrementAndGet();
+        tryScheduleNextChunk(request);
+    }
+
+    /**
+     * Core scheduling logic. Called from both the download thread (after 
writing a chunk) and the
+     * consumer thread (after consuming a chunk). Uses synchronized(request) 
to avoid race
+     * conditions between the two threads.
+     *
+     * <p><b>Lock ordering note:</b> To avoid deadlock with {@code 
LogFetchBuffer}'s internal lock
+     * (which may call {@code drain()} → {@code recycleCallback} → this 
method), we must never
+     * invoke external callbacks while holding {@code synchronized(request)}. 
All callbacks are
+     * captured inside the lock and invoked <em>after</em> releasing it.
+     */
+    private void tryScheduleNextChunk(RemoteLogDownloadRequest request) {
+        // Capture any pending callback to fire outside the lock.
+        Consumer<RemoteLogDownloadFuture> pendingCallback = null;
+        RemoteLogDownloadFuture pendingNextFuture = null;
+        boolean shouldCleanupAndRelease = false;
+
+        synchronized (request) {
+            if (request.queuedForContinuation || request.cleanedUp) {
+                return;
+            }
+
+            boolean exhausted = request.reader == null || 
request.reader.isExhausted();
+
+            if (!exhausted) {
+                int unconsumed = request.chunksWritten.get() - 
request.chunksConsumed.get();
+                if (unconsumed < maxPrefetchChunks) {
+                    // Build the next future inside the lock, but defer the 
callback invocation.
+                    RemoteLogDownloadFuture nextFuture = 
buildNextChunkFuture(request);
+                    pendingCallback = nextFuture != null ? 
nextFuture.getNextChunkCallback() : null;
+                    pendingNextFuture = nextFuture;
+                    request.queuedForContinuation = true;
+                    // Re-queue as a continuation (higher priority than new 
segments).
+                    continuationQueue.add(request);
+                }
+                // else: too many unconsumed chunks, pause downloading
+            } else if (request.chunksConsumed.get() >= 
request.chunksWritten.get()) {
+                // All chunks consumed and segment exhausted: cleanup
+                cleanupRequest(request);
+                shouldCleanupAndRelease = true;
+            }
+        }
+
+        // Fire the external callback outside the lock to prevent 
lock-ordering deadlocks.
+        if (pendingCallback != null && pendingNextFuture != null) {
+            pendingCallback.accept(pendingNextFuture);
+        }
+        if (shouldCleanupAndRelease) {
+            prefetchSemaphore.release();
+        }
     }
 
     /**
-     * Recycle the consumed remote log. The removal of the log file is async 
in the {@link
-     * #downloadThread}.
+     * Builds a new {@link RemoteLogDownloadFuture} for the next chunk and 
updates {@code
+     * request.chunkFuture} and {@code request.downloadFuture}. Must be called 
within {@code
+     * synchronized(request)}. Does NOT invoke any external callbacks.
+     *
+     * @return the newly created future, with the inherited {@code 
nextChunkCallback} already set.
      */
-    void recycleRemoteLog(RemoteLogSegment segment) {
-        segmentsToRecycle.add(segment);
-        prefetchSemaphore.release();
+    private RemoteLogDownloadFuture 
buildNextChunkFuture(RemoteLogDownloadRequest request) {
+        Consumer<RemoteLogDownloadFuture> callback =
+                request.downloadFuture != null
+                        ? request.downloadFuture.getNextChunkCallback()
+                        : null;
+
+        CompletableFuture<LogRecords> nextChunkFuture = new 
CompletableFuture<>();
+        request.chunkFuture = nextChunkFuture;
+
+        RemoteLogDownloadFuture nextFuture =
+                new RemoteLogDownloadFuture(nextChunkFuture, () -> 
onChunkConsumed(request));
+        if (callback != null) {
+            nextFuture.setNextChunkCallback(callback);
+        }
+        request.downloadFuture = nextFuture;
+        return nextFuture;
+    }
+
+    /** Cleans up resources for a completed request: closes reader, deletes 
temp file. */
+    private void cleanupRequest(RemoteLogDownloadRequest request) {
+        if (request.cleanedUp) {
+            return;
+        }
+        request.cleanedUp = true;
+        if (request.reader != null) {
+            request.reader.close();
+            request.reader = null;
+        }
+        if (request.localFileRecords != null) {
+            try {
+                request.localFileRecords.deleteIfExists();
+            } catch (IOException e) {
+                LOG.warn(
+                        "Failed to delete temp file for segment {}.",
+                        request.segment.remoteLogSegmentId(),
+                        e);
+            }
+            request.localFileRecords = null;
+        }
     }
 
     /**
-     * Fetch a remote log segment file to local. This method will block until 
there is a log segment
-     * to fetch.
+     * Reads one chunk from a remote log segment. Continuations (subsequent 
chunks of an
+     * already-opened segment) are processed first without acquiring the 
semaphore. New segments
+     * require a semaphore permit.
      */
     void fetchOnce() throws Exception {
-        // blocks until there is capacity (the fetched file is consumed)
-        prefetchSemaphore.acquire();
+        // Priority 1: process continuations (no semaphore needed).
+        RemoteLogDownloadRequest request = continuationQueue.poll();
+        if (request != null) {
+            synchronized (request) {
+                request.queuedForContinuation = false;
+            }
+            processChunkRead(request);
+            return;
+        }
 
-        // wait until there is a remote fetch request
-        RemoteLogDownloadRequest request = segmentsToFetch.poll(pollTimeout, 
TimeUnit.MILLISECONDS);
+        // Priority 2: process new segments (semaphore needed).
+        // Use tryAcquire with timeout so the thread can periodically loop 
back and check
+        // the continuation queue for new chunk requests from already-active 
segments.
+        if (!prefetchSemaphore.tryAcquire(pollTimeout, TimeUnit.MILLISECONDS)) 
{
+            return;
+        }
+        try {
+            request = segmentsToFetch.poll(pollTimeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            // Release the permit before propagating the interrupt, so that
+            // shutdown does not leak semaphore permits.
+            prefetchSemaphore.release();
+            throw e;
+        }
         if (request == null) {
             prefetchSemaphore.release();
             return;
         }
+        processChunkRead(request);
+    }
 
+    private void processChunkRead(RemoteLogDownloadRequest request) {
         TableBucket tableBucket = request.getTableBucket();
+        // Capture the chunkFuture early. A concurrent consumer thread may call
+        // createAndQueueNextChunk (via onChunkConsumed) and replace 
request.chunkFuture
+        // while we are reading/writing the chunk data.
+        final CompletableFuture<LogRecords> currentChunkFuture = 
request.chunkFuture;
         try {
-            // 1. cleanup the finished logs first to free up disk space
-            cleanupRemoteLogs();
+            // Lazily initialize the chunk reader and local temp file on first 
access.
+            if (request.reader == null) {
+                FsPath remotePath =
+                        getRemoteLogFilePath(request.remoteLogTabletDir, 
request.segment);
+                FileSystem fs = remotePath.getFileSystem();
+                FSDataInputStream inputStream = fs.open(remotePath);
+                request.reader =
+                        new RemoteSegmentChunkReader(
+                                inputStream,
+                                request.startPosition,
+                                request.segment.segmentSizeInBytes());
+
+                // Create a temp file for this segment. The File reference is 
owned by
+                // localFileRecords; deleteIfExists() in cleanupRequest 
handles removal.
+                File tempFile =
+                        new File(
+                                tmpDir,
+                                "remote-chunk-"
+                                        + request.segment.remoteLogSegmentId()
+                                        + "-"
+                                        + UUID.randomUUID()
+                                        + ".tmp");
+                request.localFileRecords = FileLogRecords.open(tempFile, true, 
false, 0, false);
+                request.localFileWrittenPosition = 0;
+            }
+
+            long startTime = System.currentTimeMillis();
+
+            // Read chunk from remote into memory (temporary).
+            MemoryLogRecords chunkData = 
request.reader.readNextChunk(chunkSize);
+
+            // Handle empty chunk (EOF or segment exhausted). This can happen 
when:
+            // 1. The segment has 0 bytes (degenerate case)
+            // 2. A continuation read reaches EOF (e.g., segment size metadata 
was larger
+            //    than the actual file, or the previous chunk already read all 
data)
+            //
+            // For the empty sentinel we do NOT increment chunksWritten. 
Instead, we directly
+            // invoke tryScheduleNextChunk which will find exhausted=true and
+            // chunksConsumed >= chunksWritten, triggering cleanup and 
semaphore release on the
+            // download thread. The consumer's later recycleCallback (from 
drain()) is harmless
+            // because cleanedUp=true will cause tryScheduleNextChunk to 
return immediately.
+            if (chunkData.sizeInBytes() == 0) {
+                currentChunkFuture.complete(MemoryLogRecords.EMPTY);
+                tryScheduleNextChunk(request);
+                return;
+            }
 
-            // 2. do the actual download work
-            FsPathAndFileName fsPathAndFileName = 
request.getFsPathAndFileName();
             scannerMetricGroup.remoteFetchRequestCount().inc();
 
-            long startTime = System.currentTimeMillis();
-            // download the remote file to local
-            remoteFileDownloader
-                    .downloadFileAsync(fsPathAndFileName, localLogDir)

Review Comment:
   The old fetchOnce dispatched the download to RemoteFileDownloader's thread 
pool (default 3) and returned immediately, so multiple segments could be 
downloading at once. The new processChunkRead does the fs.open + read inline on 
the dispatcher, so only one chunk is ever in flight.
   
   this looks like it serializes remote reads, is it intentional or I'm missing 
smth?



##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java:
##########
@@ -282,6 +253,117 @@ void testOrderOfRemoteLogDownloadRequest() {
         assertThat(results).isEqualTo(expected);
     }
 
+    @Test
+    void testMaxPrefetchChunks() throws Exception {
+        // Set maxPrefetchChunks = 2 to verify flow control.
+        conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS, 
2);
+        // Set prefetchNum = 1 so only one segment is active.
+        conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 1);
+        RemoteLogDownloader remoteLogDownloader =
+                new RemoteLogDownloader(DATA1_TABLE_PATH, conf, 
scannerMetricGroup, 10L);
+        try {
+            remoteLogDownloader.start();
+
+            TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
+            // Build a large segment with multiple records so multiple chunks 
are produced.
+            List<RemoteLogSegment> remoteLogSegments =

Review Comment:
   It doesn't look big or at least >8mb chunk size: 
https://github.com/apache/fluss/blob/170e95f9039c54eb7255a84f761b609a77c7e594/fluss-common/src/test/java/org/apache/fluss/record/TestData.java#L44



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to