platinumhamburg commented on code in PR #2855: URL: https://github.com/apache/fluss/pull/2855#discussion_r3013983635
########## fluss-server/src/test/java/org/apache/fluss/server/kv/RemoteLogFetcherTest.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.exception.RemoteStorageException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.LogRecordBatch; +import org.apache.fluss.remote.RemoteLogSegment; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.log.remote.RemoteLogTestBase; +import org.apache.fluss.server.replica.Replica; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link RemoteLogFetcher}. */ +class RemoteLogFetcherTest extends RemoteLogTestBase { Review Comment: There's no test for the case where `startOffset == localLogStartOffset`. If the comparison in `advance()` were accidentally changed from `>=` to `>`, the fetcher would return batches beyond the intended range — which during KV recovery means applying duplicate records. Would be great to add something like: ```java @Test void testFetchWithEmptyRange() throws Exception { // ... setup ... long sameOffset = segments.get(0).remoteLogStartOffset(); try (RemoteLogFetcher fetcher = new RemoteLogFetcher(remoteLogManager, tb, dataDir)) { Iterable<LogRecordBatch> batches = fetcher.fetch(sameOffset, sameOffset); List<LogRecordBatch> batchList = new ArrayList<>(); for (LogRecordBatch batch : batches) { batchList.add(batch); } assertThat(batchList).isEmpty(); } ########## fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.RemoteStorageException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.FileLogRecords; +import org.apache.fluss.record.LogRecordBatch; +import org.apache.fluss.remote.RemoteLogSegment; +import org.apache.fluss.server.log.remote.RemoteLogManager; +import org.apache.fluss.server.log.remote.RemoteLogStorage; +import org.apache.fluss.utils.FlussPaths; +import org.apache.fluss.utils.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly; + +/** + * A utility class that fetches remote log segments and makes them available as {@link + * FileLogRecords} for KV recovery. It downloads remote log data into a local temporary directory + * using a UUID to avoid conflicts with other concurrent recovery operations. + * + * <p>The fetcher is {@link Closeable} and the caller must close it after use to clean up the + * temporary directory. It is recommended to use try-with-resources to ensure proper resource + * cleanup: + * + * <pre>{@code + * try (RemoteLogFetcher fetcher = new RemoteLogFetcher(...)) { + * for (LogRecordBatch batch : fetcher.fetch(startOffset, localLogStartOffset)) { + * // process batch + * } + * } + * }</pre> + * + * <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used by a single thread + * only. + */ +public class RemoteLogFetcher implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RemoteLogFetcher.class); + + private static final String REMOTE_LOG_RECOVERY_DIR_PREFIX = "remote-log-recovery-"; + + private final RemoteLogManager remoteLogManager; + private final TableBucket tableBucket; + private final Path tempDir; + + /** Tracks the currently active iterator to ensure proper cleanup on close. */ + private volatile RemoteLogBatchIterator activeIterator; + + public RemoteLogFetcher( + RemoteLogManager remoteLogManager, TableBucket tableBucket, File dataDir) + throws IOException { + this( + remoteLogManager, + tableBucket, + Files.createDirectories( + dataDir.toPath() + .resolve("tmp") + .resolve(REMOTE_LOG_RECOVERY_DIR_PREFIX + UUID.randomUUID()))); + } + + @VisibleForTesting + RemoteLogFetcher(RemoteLogManager remoteLogManager, TableBucket tableBucket, Path tempDir) + throws IOException { + this.remoteLogManager = remoteLogManager; + this.tableBucket = tableBucket; + this.tempDir = tempDir; + Files.createDirectories(tempDir); + } + + /** + * Fetches all relevant remote log segments that cover the range from {@code startOffset} up to + * {@code localLogStartOffset}, and iterates over the log record batches in order. + * + * <p>The returned {@link Iterable} is lazily loaded - remote log segments are downloaded and + * processed only when iterating through the batches. This means that file downloads and I/O + * operations occur during iteration, not when this method is called. + * + * @param startOffset the offset to start fetching from (inclusive) + * @param localLogStartOffset the local log start offset (exclusive, stop before this) + * @return an iterable over all {@link LogRecordBatch} from the fetched remote segments. The + * iterator lazily downloads segments as needed. + * @throws Exception if any error occurs during fetching or reading + */ + public Iterable<LogRecordBatch> fetch(long startOffset, long localLogStartOffset) + throws Exception { + List<RemoteLogSegment> segments = + remoteLogManager.relevantRemoteLogSegments(tableBucket, startOffset); + if (segments.isEmpty()) { + throw new RemoteStorageException( + String.format( + "No remote log segments found for table bucket %s at offset %d", + tableBucket, startOffset)); + } + + LOG.info( + "Found {} remote log segments for table bucket {} from offset {} to local log start offset {}", + segments.size(), + tableBucket, + startOffset, + localLogStartOffset); + + RemoteLogBatchIterator iterator = + new RemoteLogBatchIterator(segments, startOffset, localLogStartOffset); + this.activeIterator = iterator; Review Comment: If `fetch()` is called more than once, the previous `RemoteLogBatchIterator` (and its open `FileLogRecords`) gets silently abandoned — file descriptors leak. I know the current call site only calls it once, but since the class implements `Closeable` and manages lifecycle, the contract should be self-consistent. Simple fix — close the old one first: ```java public Iterable<LogRecordBatch> fetch(long startOffset, long localLogStartOffset) throws Exception { // Close any previously active iterator before creating a new one RemoteLogBatchIterator prev = this.activeIterator; if (prev != null) { prev.close(); } List<RemoteLogSegment> segments = remoteLogManager.relevantRemoteLogSegments(tableBucket, startOffset); // ... rest unchanged ``` ########## fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.RemoteStorageException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.FileLogRecords; +import org.apache.fluss.record.LogRecordBatch; +import org.apache.fluss.remote.RemoteLogSegment; +import org.apache.fluss.server.log.remote.RemoteLogManager; +import org.apache.fluss.server.log.remote.RemoteLogStorage; +import org.apache.fluss.utils.FlussPaths; +import org.apache.fluss.utils.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +import static org.apache.fluss.utils.FileUtils.deleteDirectoryQuietly; + +/** + * A utility class that fetches remote log segments and makes them available as {@link + * FileLogRecords} for KV recovery. It downloads remote log data into a local temporary directory + * using a UUID to avoid conflicts with other concurrent recovery operations. + * + * <p>The fetcher is {@link Closeable} and the caller must close it after use to clean up the + * temporary directory. It is recommended to use try-with-resources to ensure proper resource + * cleanup: + * + * <pre>{@code + * try (RemoteLogFetcher fetcher = new RemoteLogFetcher(...)) { + * for (LogRecordBatch batch : fetcher.fetch(startOffset, localLogStartOffset)) { + * // process batch + * } + * } + * }</pre> + * + * <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used by a single thread + * only. + */ +public class RemoteLogFetcher implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RemoteLogFetcher.class); + + private static final String REMOTE_LOG_RECOVERY_DIR_PREFIX = "remote-log-recovery-"; + + private final RemoteLogManager remoteLogManager; + private final TableBucket tableBucket; + private final Path tempDir; + + /** Tracks the currently active iterator to ensure proper cleanup on close. */ + private volatile RemoteLogBatchIterator activeIterator; + + public RemoteLogFetcher( + RemoteLogManager remoteLogManager, TableBucket tableBucket, File dataDir) + throws IOException { + this( + remoteLogManager, + tableBucket, + Files.createDirectories( + dataDir.toPath() + .resolve("tmp") + .resolve(REMOTE_LOG_RECOVERY_DIR_PREFIX + UUID.randomUUID()))); + } + + @VisibleForTesting + RemoteLogFetcher(RemoteLogManager remoteLogManager, TableBucket tableBucket, Path tempDir) + throws IOException { + this.remoteLogManager = remoteLogManager; + this.tableBucket = tableBucket; + this.tempDir = tempDir; + Files.createDirectories(tempDir); + } + + /** + * Fetches all relevant remote log segments that cover the range from {@code startOffset} up to + * {@code localLogStartOffset}, and iterates over the log record batches in order. + * + * <p>The returned {@link Iterable} is lazily loaded - remote log segments are downloaded and + * processed only when iterating through the batches. This means that file downloads and I/O + * operations occur during iteration, not when this method is called. + * + * @param startOffset the offset to start fetching from (inclusive) + * @param localLogStartOffset the local log start offset (exclusive, stop before this) + * @return an iterable over all {@link LogRecordBatch} from the fetched remote segments. The + * iterator lazily downloads segments as needed. + * @throws Exception if any error occurs during fetching or reading + */ + public Iterable<LogRecordBatch> fetch(long startOffset, long localLogStartOffset) + throws Exception { + List<RemoteLogSegment> segments = + remoteLogManager.relevantRemoteLogSegments(tableBucket, startOffset); + if (segments.isEmpty()) { + throw new RemoteStorageException( + String.format( + "No remote log segments found for table bucket %s at offset %d", + tableBucket, startOffset)); + } + + LOG.info( + "Found {} remote log segments for table bucket {} from offset {} to local log start offset {}", + segments.size(), + tableBucket, + startOffset, + localLogStartOffset); + + RemoteLogBatchIterator iterator = + new RemoteLogBatchIterator(segments, startOffset, localLogStartOffset); + this.activeIterator = iterator; + return () -> iterator; + } + + @Override + public void close() { + // Close any active iterator to release file handles + if (activeIterator != null) { + activeIterator.close(); + activeIterator = null; + } + LOG.info("Cleaning up remote log recovery temp dir: {}", tempDir); + deleteDirectoryQuietly(tempDir.toFile()); + } + + @VisibleForTesting + Path getTempDir() { + return tempDir; + } + + /** + * Downloads the log data of a remote log segment to a local temporary file. + * + * @return the local file containing the downloaded log data + */ + private File downloadSegment(RemoteLogSegment segment) throws IOException { + File localFile = + tempDir.resolve( + FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset()) + + ".log") + .toFile(); + + RemoteLogStorage remoteLogStorage = remoteLogManager.getRemoteLogStorage(); + LOG.info( + "Downloading remote log segment {} (offsets {}-{}) to {}", + segment.remoteLogSegmentId(), + segment.remoteLogStartOffset(), + segment.remoteLogEndOffset(), + localFile); + + try (InputStream inputStream = remoteLogStorage.fetchLogData(segment); + OutputStream outputStream = Files.newOutputStream(localFile.toPath())) { + IOUtils.copyBytes(inputStream, outputStream, false); + } catch (RemoteStorageException e) { + throw new IOException( + "Failed to download remote log segment: " + segment.remoteLogSegmentId(), e); + } + return localFile; + } + + /** + * An iterator that lazily downloads remote log segments and iterates over their batches in + * order. It respects the startOffset and localLogStartOffset boundaries, yielding only batches + * within [startOffset, localLogStartOffset). + */ + private class RemoteLogBatchIterator implements Iterator<LogRecordBatch> { + private final List<RemoteLogSegment> segments; + private final long localLogStartOffset; + + /** Tracks the current read offset, advancing as batches are consumed. */ + private long currentOffset; + + private int currentSegmentIndex = 0; + private FileLogRecords currentFileLogRecords; + private Iterator<LogRecordBatch> currentBatchIterator; + private LogRecordBatch nextBatch; + private boolean finished = false; + private volatile boolean closed = false; + + RemoteLogBatchIterator( + List<RemoteLogSegment> segments, long startOffset, long localLogStartOffset) { + this.segments = segments; + this.currentOffset = startOffset; + this.localLogStartOffset = localLogStartOffset; + } + + /** Closes this iterator and releases all held resources. */ + public void close() { + if (!closed) { + closed = true; + closeCurrentFileLogRecords(); + } + } + + @Override + public boolean hasNext() { + // Lazily advance: only fetch next batch when needed. This ensures the + // previously returned FileChannelLogRecordBatch has been fully consumed + // by the caller before advance() potentially closes its underlying file. + if (nextBatch == null && !finished) { + advance(); + } + return nextBatch != null; + } + + @Override + public LogRecordBatch next() { + if (!hasNext()) { + throw new java.util.NoSuchElementException(); + } + LogRecordBatch result = nextBatch; + nextBatch = null; + return result; + } + + private void advance() { + nextBatch = null; + while (!finished) { + // try to get next batch from current iterator + if (currentBatchIterator != null && currentBatchIterator.hasNext()) { + LogRecordBatch batch = currentBatchIterator.next(); + // skip batches entirely before currentOffset + if (batch.nextLogOffset() <= currentOffset) { + continue; + } + // stop if we've reached localLogStartOffset + if (batch.baseLogOffset() >= localLogStartOffset) { + finished = true; + closeCurrentFileLogRecords(); + return; + } + nextBatch = batch; + // advance currentOffset so subsequent segments use updated position + currentOffset = batch.nextLogOffset(); + return; + } + + // close current file log records + closeCurrentFileLogRecords(); + + // move to next segment + if (currentSegmentIndex >= segments.size()) { + finished = true; + return; + } + + RemoteLogSegment segment = segments.get(currentSegmentIndex++); + // skip segments entirely before currentOffset + if (segment.remoteLogEndOffset() <= currentOffset) { Review Comment: `RemoteLogSegment.remoteLogEndOffset()` is documented as inclusive ("Remote log end offset of this segment (inclusive)"), but the skip check here uses `<=`: ```java if (segment.remoteLogEndOffset() <= currentOffset) { continue; } ``` When `currentOffset` equals the segment's last record offset, the segment gets incorrectly skipped — that record is lost. During KV recovery this could cause data inconsistency. Should be strict less-than if the end offset is truly inclusive: ```java // remoteLogEndOffset() is inclusive — last offset in segment if (segment.remoteLogEndOffset() < currentOffset) { continue; } ``` Or if it's actually exclusive, a clarifying comment would help since the Javadoc says otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
