platinumhamburg commented on code in PR #2855:
URL: https://github.com/apache/fluss/pull/2855#discussion_r3013983635


##########
fluss-server/src/test/java/org/apache/fluss/server/kv/RemoteLogFetcherTest.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.exception.RemoteStorageException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.LogRecordBatch;
+import org.apache.fluss.remote.RemoteLogSegment;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.log.remote.RemoteLogTestBase;
+import org.apache.fluss.server.replica.Replica;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link RemoteLogFetcher}. */
+class RemoteLogFetcherTest extends RemoteLogTestBase {

Review Comment:
   There's no test for the case where `startOffset == localLogStartOffset`. If 
the comparison in `advance()` were accidentally changed from `>=` to `>`, the 
fetcher would return batches beyond the intended range — which during KV 
recovery means applying duplicate records.
   
   Would be great to add something like:
   
   ```java
   @Test
   void testFetchWithEmptyRange() throws Exception {
       // ... setup ...
       long sameOffset = segments.get(0).remoteLogStartOffset();
       try (RemoteLogFetcher fetcher = new RemoteLogFetcher(remoteLogManager, 
tb, dataDir)) {
           Iterable<LogRecordBatch> batches = fetcher.fetch(sameOffset, 
sameOffset);
           List<LogRecordBatch> batchList = new ArrayList<>();
           for (LogRecordBatch batch : batches) {
               batchList.add(batch);
           }
           assertThat(batchList).isEmpty();
       }



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.RemoteStorageException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.FileLogRecords;
+import org.apache.fluss.record.LogRecordBatch;
+import org.apache.fluss.remote.RemoteLogSegment;
+import org.apache.fluss.server.log.remote.RemoteLogManager;
+import org.apache.fluss.server.log.remote.RemoteLogStorage;
+import org.apache.fluss.utils.FlussPaths;
+import org.apache.fluss.utils.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly;
+
+/**
+ * A utility class that fetches remote log segments and makes them available 
as {@link
+ * FileLogRecords} for KV recovery. It downloads remote log data into a local 
temporary directory
+ * using a UUID to avoid conflicts with other concurrent recovery operations.
+ *
+ * <p>The fetcher is {@link Closeable} and the caller must close it after use 
to clean up the
+ * temporary directory. It is recommended to use try-with-resources to ensure 
proper resource
+ * cleanup:
+ *
+ * <pre>{@code
+ * try (RemoteLogFetcher fetcher = new RemoteLogFetcher(...)) {
+ *     for (LogRecordBatch batch : fetcher.fetch(startOffset, 
localLogStartOffset)) {
+ *         // process batch
+ *     }
+ * }
+ * }</pre>
+ *
+ * <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used 
by a single thread
+ * only.
+ */
+public class RemoteLogFetcher implements Closeable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RemoteLogFetcher.class);
+
+    private static final String REMOTE_LOG_RECOVERY_DIR_PREFIX = 
"remote-log-recovery-";
+
+    private final RemoteLogManager remoteLogManager;
+    private final TableBucket tableBucket;
+    private final Path tempDir;
+
+    /** Tracks the currently active iterator to ensure proper cleanup on 
close. */
+    private volatile RemoteLogBatchIterator activeIterator;
+
+    public RemoteLogFetcher(
+            RemoteLogManager remoteLogManager, TableBucket tableBucket, File 
dataDir)
+            throws IOException {
+        this(
+                remoteLogManager,
+                tableBucket,
+                Files.createDirectories(
+                        dataDir.toPath()
+                                .resolve("tmp")
+                                .resolve(REMOTE_LOG_RECOVERY_DIR_PREFIX + 
UUID.randomUUID())));
+    }
+
+    @VisibleForTesting
+    RemoteLogFetcher(RemoteLogManager remoteLogManager, TableBucket 
tableBucket, Path tempDir)
+            throws IOException {
+        this.remoteLogManager = remoteLogManager;
+        this.tableBucket = tableBucket;
+        this.tempDir = tempDir;
+        Files.createDirectories(tempDir);
+    }
+
+    /**
+     * Fetches all relevant remote log segments that cover the range from 
{@code startOffset} up to
+     * {@code localLogStartOffset}, and iterates over the log record batches 
in order.
+     *
+     * <p>The returned {@link Iterable} is lazily loaded - remote log segments 
are downloaded and
+     * processed only when iterating through the batches. This means that file 
downloads and I/O
+     * operations occur during iteration, not when this method is called.
+     *
+     * @param startOffset the offset to start fetching from (inclusive)
+     * @param localLogStartOffset the local log start offset (exclusive, stop 
before this)
+     * @return an iterable over all {@link LogRecordBatch} from the fetched 
remote segments. The
+     *     iterator lazily downloads segments as needed.
+     * @throws Exception if any error occurs during fetching or reading
+     */
+    public Iterable<LogRecordBatch> fetch(long startOffset, long 
localLogStartOffset)
+            throws Exception {
+        List<RemoteLogSegment> segments =
+                remoteLogManager.relevantRemoteLogSegments(tableBucket, 
startOffset);
+        if (segments.isEmpty()) {
+            throw new RemoteStorageException(
+                    String.format(
+                            "No remote log segments found for table bucket %s 
at offset %d",
+                            tableBucket, startOffset));
+        }
+
+        LOG.info(
+                "Found {} remote log segments for table bucket {} from offset 
{} to local log start offset {}",
+                segments.size(),
+                tableBucket,
+                startOffset,
+                localLogStartOffset);
+
+        RemoteLogBatchIterator iterator =
+                new RemoteLogBatchIterator(segments, startOffset, 
localLogStartOffset);
+        this.activeIterator = iterator;

Review Comment:
   If `fetch()` is called more than once, the previous `RemoteLogBatchIterator` 
(and its open `FileLogRecords`) gets silently abandoned — file descriptors 
leak. I know the current call site only calls it once, but since the class 
implements `Closeable` and manages lifecycle, the contract should be 
self-consistent.
   
   Simple fix — close the old one first:
   
   ```java
   public Iterable<LogRecordBatch> fetch(long startOffset, long 
localLogStartOffset)
           throws Exception {
       // Close any previously active iterator before creating a new one
       RemoteLogBatchIterator prev = this.activeIterator;
       if (prev != null) {
           prev.close();
       }
   
       List<RemoteLogSegment> segments =
               remoteLogManager.relevantRemoteLogSegments(tableBucket, 
startOffset);
       // ... rest unchanged
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.RemoteStorageException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.FileLogRecords;
+import org.apache.fluss.record.LogRecordBatch;
+import org.apache.fluss.remote.RemoteLogSegment;
+import org.apache.fluss.server.log.remote.RemoteLogManager;
+import org.apache.fluss.server.log.remote.RemoteLogStorage;
+import org.apache.fluss.utils.FlussPaths;
+import org.apache.fluss.utils.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly;
+
+/**
+ * A utility class that fetches remote log segments and makes them available 
as {@link
+ * FileLogRecords} for KV recovery. It downloads remote log data into a local 
temporary directory
+ * using a UUID to avoid conflicts with other concurrent recovery operations.
+ *
+ * <p>The fetcher is {@link Closeable} and the caller must close it after use 
to clean up the
+ * temporary directory. It is recommended to use try-with-resources to ensure 
proper resource
+ * cleanup:
+ *
+ * <pre>{@code
+ * try (RemoteLogFetcher fetcher = new RemoteLogFetcher(...)) {
+ *     for (LogRecordBatch batch : fetcher.fetch(startOffset, 
localLogStartOffset)) {
+ *         // process batch
+ *     }
+ * }
+ * }</pre>
+ *
+ * <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used 
by a single thread
+ * only.
+ */
+public class RemoteLogFetcher implements Closeable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RemoteLogFetcher.class);
+
+    private static final String REMOTE_LOG_RECOVERY_DIR_PREFIX = 
"remote-log-recovery-";
+
+    private final RemoteLogManager remoteLogManager;
+    private final TableBucket tableBucket;
+    private final Path tempDir;
+
+    /** Tracks the currently active iterator to ensure proper cleanup on 
close. */
+    private volatile RemoteLogBatchIterator activeIterator;
+
+    public RemoteLogFetcher(
+            RemoteLogManager remoteLogManager, TableBucket tableBucket, File 
dataDir)
+            throws IOException {
+        this(
+                remoteLogManager,
+                tableBucket,
+                Files.createDirectories(
+                        dataDir.toPath()
+                                .resolve("tmp")
+                                .resolve(REMOTE_LOG_RECOVERY_DIR_PREFIX + 
UUID.randomUUID())));
+    }
+
+    @VisibleForTesting
+    RemoteLogFetcher(RemoteLogManager remoteLogManager, TableBucket 
tableBucket, Path tempDir)
+            throws IOException {
+        this.remoteLogManager = remoteLogManager;
+        this.tableBucket = tableBucket;
+        this.tempDir = tempDir;
+        Files.createDirectories(tempDir);
+    }
+
+    /**
+     * Fetches all relevant remote log segments that cover the range from 
{@code startOffset} up to
+     * {@code localLogStartOffset}, and iterates over the log record batches 
in order.
+     *
+     * <p>The returned {@link Iterable} is lazily loaded - remote log segments 
are downloaded and
+     * processed only when iterating through the batches. This means that file 
downloads and I/O
+     * operations occur during iteration, not when this method is called.
+     *
+     * @param startOffset the offset to start fetching from (inclusive)
+     * @param localLogStartOffset the local log start offset (exclusive, stop 
before this)
+     * @return an iterable over all {@link LogRecordBatch} from the fetched 
remote segments. The
+     *     iterator lazily downloads segments as needed.
+     * @throws Exception if any error occurs during fetching or reading
+     */
+    public Iterable<LogRecordBatch> fetch(long startOffset, long 
localLogStartOffset)
+            throws Exception {
+        List<RemoteLogSegment> segments =
+                remoteLogManager.relevantRemoteLogSegments(tableBucket, 
startOffset);
+        if (segments.isEmpty()) {
+            throw new RemoteStorageException(
+                    String.format(
+                            "No remote log segments found for table bucket %s 
at offset %d",
+                            tableBucket, startOffset));
+        }
+
+        LOG.info(
+                "Found {} remote log segments for table bucket {} from offset 
{} to local log start offset {}",
+                segments.size(),
+                tableBucket,
+                startOffset,
+                localLogStartOffset);
+
+        RemoteLogBatchIterator iterator =
+                new RemoteLogBatchIterator(segments, startOffset, 
localLogStartOffset);
+        this.activeIterator = iterator;
+        return () -> iterator;
+    }
+
+    @Override
+    public void close() {
+        // Close any active iterator to release file handles
+        if (activeIterator != null) {
+            activeIterator.close();
+            activeIterator = null;
+        }
+        LOG.info("Cleaning up remote log recovery temp dir: {}", tempDir);
+        deleteDirectoryQuietly(tempDir.toFile());
+    }
+
+    @VisibleForTesting
+    Path getTempDir() {
+        return tempDir;
+    }
+
+    /**
+     * Downloads the log data of a remote log segment to a local temporary 
file.
+     *
+     * @return the local file containing the downloaded log data
+     */
+    private File downloadSegment(RemoteLogSegment segment) throws IOException {
+        File localFile =
+                tempDir.resolve(
+                                
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
+                                        + ".log")
+                        .toFile();
+
+        RemoteLogStorage remoteLogStorage = 
remoteLogManager.getRemoteLogStorage();
+        LOG.info(
+                "Downloading remote log segment {} (offsets {}-{}) to {}",
+                segment.remoteLogSegmentId(),
+                segment.remoteLogStartOffset(),
+                segment.remoteLogEndOffset(),
+                localFile);
+
+        try (InputStream inputStream = remoteLogStorage.fetchLogData(segment);
+                OutputStream outputStream = 
Files.newOutputStream(localFile.toPath())) {
+            IOUtils.copyBytes(inputStream, outputStream, false);
+        } catch (RemoteStorageException e) {
+            throw new IOException(
+                    "Failed to download remote log segment: " + 
segment.remoteLogSegmentId(), e);
+        }
+        return localFile;
+    }
+
+    /**
+     * An iterator that lazily downloads remote log segments and iterates over 
their batches in
+     * order. It respects the startOffset and localLogStartOffset boundaries, 
yielding only batches
+     * within [startOffset, localLogStartOffset).
+     */
+    private class RemoteLogBatchIterator implements Iterator<LogRecordBatch> {
+        private final List<RemoteLogSegment> segments;
+        private final long localLogStartOffset;
+
+        /** Tracks the current read offset, advancing as batches are consumed. 
*/
+        private long currentOffset;
+
+        private int currentSegmentIndex = 0;
+        private FileLogRecords currentFileLogRecords;
+        private Iterator<LogRecordBatch> currentBatchIterator;
+        private LogRecordBatch nextBatch;
+        private boolean finished = false;
+        private volatile boolean closed = false;
+
+        RemoteLogBatchIterator(
+                List<RemoteLogSegment> segments, long startOffset, long 
localLogStartOffset) {
+            this.segments = segments;
+            this.currentOffset = startOffset;
+            this.localLogStartOffset = localLogStartOffset;
+        }
+
+        /** Closes this iterator and releases all held resources. */
+        public void close() {
+            if (!closed) {
+                closed = true;
+                closeCurrentFileLogRecords();
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            // Lazily advance: only fetch next batch when needed. This ensures 
the
+            // previously returned FileChannelLogRecordBatch has been fully 
consumed
+            // by the caller before advance() potentially closes its 
underlying file.
+            if (nextBatch == null && !finished) {
+                advance();
+            }
+            return nextBatch != null;
+        }
+
+        @Override
+        public LogRecordBatch next() {
+            if (!hasNext()) {
+                throw new java.util.NoSuchElementException();
+            }
+            LogRecordBatch result = nextBatch;
+            nextBatch = null;
+            return result;
+        }
+
+        private void advance() {
+            nextBatch = null;
+            while (!finished) {
+                // try to get next batch from current iterator
+                if (currentBatchIterator != null && 
currentBatchIterator.hasNext()) {
+                    LogRecordBatch batch = currentBatchIterator.next();
+                    // skip batches entirely before currentOffset
+                    if (batch.nextLogOffset() <= currentOffset) {
+                        continue;
+                    }
+                    // stop if we've reached localLogStartOffset
+                    if (batch.baseLogOffset() >= localLogStartOffset) {
+                        finished = true;
+                        closeCurrentFileLogRecords();
+                        return;
+                    }
+                    nextBatch = batch;
+                    // advance currentOffset so subsequent segments use 
updated position
+                    currentOffset = batch.nextLogOffset();
+                    return;
+                }
+
+                // close current file log records
+                closeCurrentFileLogRecords();
+
+                // move to next segment
+                if (currentSegmentIndex >= segments.size()) {
+                    finished = true;
+                    return;
+                }
+
+                RemoteLogSegment segment = segments.get(currentSegmentIndex++);
+                // skip segments entirely before currentOffset
+                if (segment.remoteLogEndOffset() <= currentOffset) {

Review Comment:
   
   `RemoteLogSegment.remoteLogEndOffset()` is documented as inclusive ("Remote 
log end offset of this segment (inclusive)"), but the skip check here uses `<=`:
   
   ```java
   if (segment.remoteLogEndOffset() <= currentOffset) {
       continue;
   }
   ```
   
   When `currentOffset` equals the segment's last record offset, the segment 
gets incorrectly skipped — that record is lost. During KV recovery this could 
cause data inconsistency.
   
   Should be strict less-than if the end offset is truly inclusive:
   
   ```java
   // remoteLogEndOffset() is inclusive — last offset in segment
   if (segment.remoteLogEndOffset() < currentOffset) {
       continue;
   }
   ```
   
   Or if it's actually exclusive, a clarifying comment would help since the 
Javadoc says otherwise.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to