fresh-borzoni commented on code in PR #3132:
URL: https://github.com/apache/fluss/pull/3132#discussion_r3220334466
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -333,12 +555,202 @@ private void advance() {
}
}
+ /**
+ * Obtain the local file for {@code segment}, ideally reusing a
prefetched download. On
+ * success the ring head is consumed and a refill is triggered, so the
window stays full as
+ * long as more segments remain.
+ */
+ private File fetchSegmentFile(RemoteLogSegment segment) throws
IOException {
+ // Invariant: fillPrefetchWindow() and advance() walk the same
pre-filtered
+ // `segments` list in strict order with identical skip rules;
consumption is
+ // single-threaded FIFO; prefetchNum >= 1 (Math.max(1, ...) in the
ctor). So
+ // the ring head is *always* the segment advance() is asking for.
+ int headSlot = nextConsumeIndex % prefetchNum;
+ RemoteLogSegment headSegment = prefetchSegments[headSlot];
+ assert isSameSegmentId(segment, headSegment)
+ : "prefetch ring head "
+ + (headSegment == null ? "null" :
headSegment.remoteLogSegmentId())
+ + " does not match requested "
+ + segment.remoteLogSegmentId();
+ Future<File> headFuture = prefetchSlots[headSlot];
+ // Eagerly null the slot refs so the ring never keeps stale
references even if the
+ // consumer throws below — also releases the RemoteLogSegment for
GC early.
+ prefetchSlots[headSlot] = null;
+ prefetchSegments[headSlot] = null;
+ nextConsumeIndex++;
+ try {
+ return headFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Interrupted while waiting for remote log segment
download: "
+ + segment.remoteLogSegmentId(),
+ e);
+ } catch (CancellationException e) {
+ LOG.warn(
+ "Prefetched segment {} was cancelled, fallback to sync
download.",
+ segment.remoteLogSegmentId(),
+ e);
+ return downloadSegmentWithRetry(segment);
+ } catch (ExecutionException e) {
+ LOG.warn(
+ "Prefetched segment {} failed even after async
retries, "
+ + "fallback to one more synchronous retry
round.",
+ segment.remoteLogSegmentId(),
+ e.getCause());
+ return downloadSegmentWithRetry(segment);
+ } finally {
+ // Window slot released — try to submit the next prefetch
right away.
+ fillPrefetchWindow();
+ }
+ }
+
+ private boolean isSameSegmentId(RemoteLogSegment left,
RemoteLogSegment right) {
+ return left != null
+ && right != null
+ &&
left.remoteLogSegmentId().equals(right.remoteLogSegmentId());
+ }
+
+ /**
+ * Submit as many download tasks as fit in the window. Non-blocking:
if the window is full
+ * we stop and wait for the consumer to advance.
+ */
+ private void fillPrefetchWindow() {
+ if (closed) {
+ return;
+ }
+
+ while (nextPrefetchIndex < segments.size()) {
+ RemoteLogSegment segment = segments.get(nextPrefetchIndex);
+
+ // segment entirely before currentOffset — skip and advance
index.
+ if (segment.remoteLogEndOffset() < currentOffset) {
+ nextPrefetchIndex++;
+ continue;
+ }
+ // segment starts at or beyond localLogStartOffset — nothing
more to prefetch.
+ if (segment.remoteLogStartOffset() >= localLogStartOffset) {
+ return;
+ }
+
+ // Window full — back off and wait for the consumer to advance.
+ if (prefetchWindowSize() >= prefetchNum) {
+ return;
+ }
+
+ final RemoteLogSegment target = segment;
+ Future<File> future;
+ try {
+ // submit(Callable) returns a FutureTask whose
cancel(true) actually calls
+ // Thread.interrupt() on the worker, unlike
CompletableFuture.cancel which
+ // only flips state. This lets stale-fallback / inject /
second-fetch paths
+ // really stop in-flight downloads instead of orphaning
their files in tempDir.
+ future =
+ downloadExecutor.submit(
+ (Callable<File>) () ->
downloadSegmentWithRetry(target));
+ } catch (Throwable submitError) {
+ // The executor rejected the task (e.g. we're shutting
down); stop refilling.
+ // No slot has been claimed yet, so there is nothing to
roll back.
+ LOG.debug(
+ "Failed to submit prefetch for segment {}
(executor likely shutting down).",
+ target.remoteLogSegmentId(),
+ submitError);
+ return;
+ }
+
+ int slot = nextPrefetchIndex % prefetchNum;
+ prefetchSlots[slot] = future;
+ prefetchSegments[slot] = target;
+ nextPrefetchIndex++;
+ LOG.debug(
+ "Prefetching remote log segment {} for bucket {}
(window size={}, free slots={}).",
+ target.remoteLogSegmentId(),
+ tableBucket,
+ prefetchWindowSize(),
+ prefetchNum - prefetchWindowSize());
+ }
+ }
+
+ /**
+ * Cancel or clean up every entry currently in the window. Used on
close, iterator failure,
+ * and stale-ring fallback. After this runs the invariant {@code
nextConsumeIndex ==
+ * nextPrefetchIndex} holds, so the window is empty.
+ */
+ private void drainPrefetchWindow() {
+ while (nextConsumeIndex < nextPrefetchIndex) {
+ int slot = nextConsumeIndex % prefetchNum;
+ Future<File> future = prefetchSlots[slot];
+ prefetchSlots[slot] = null;
+ prefetchSegments[slot] = null;
+ nextConsumeIndex++;
+ if (future.isDone()) {
+ if (!cleanupCompletedFuture(future)) {
+ return;
+ }
+ } else {
+ if (!future.cancel(true) &&
!cleanupCompletedFuture(future)) {
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Drain a future that is (now) known to be completed: pull its file
out via {@link
+ * Future#get()} and delete it. {@code isDone()} guarantees {@code
get()} does not block.
+ *
+ * @return {@code true} on a clean drain, {@code false} if this thread
was interrupted while
+ * calling {@code get()} (caller should bail out and let close()'s
shutdownNow path
+ * reclaim the remaining slots).
+ */
+ private boolean cleanupCompletedFuture(Future<File> future) {
+ try {
+ // isDone() is true so get() will not block; it returns either
the
+ // successfully downloaded file, or throws
Cancellation/Execution.
+ cleanupUnusedPrefetchedFile(future.get());
+ } catch (CancellationException | ExecutionException ignored) {
+ // no local file to clean up
+ } catch (InterruptedException e) {
+ // A completed future never actually blocks here, so an
interrupt on this
+ // thread came from elsewhere. Restore the flag and bail out —
close()'s
+ // follow-up shutdownNow()/awaitTermination path will reclaim
whatever's
+ // left in the ring.
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }
+
+ private void cleanupUnusedPrefetchedFile(File prefetchedFile) {
+ if (prefetchedFile == null) {
+ return;
+ }
+
+ try {
+ Files.deleteIfExists(prefetchedFile.toPath());
+ } catch (IOException cleanupException) {
+ LOG.warn(
+ "Failed to cleanup unused prefetched segment file {}
for table bucket {}.",
+ prefetchedFile,
+ tableBucket,
+ cleanupException);
+ }
+ }
+
private void closeCurrentFileLogRecords() {
Review Comment:
The new config bounds “downloaded but not yet consumed” segments, but
consumed segment files are not deleted when we move to the next segment.
closeCurrentFileLogRecords() closes FileLogRecords, but the underlying .log
file remains in tempDir until fetcher close.
Could we track the currently opened local segment file and delete it after
closing FileLogRecords? Something like currentLocalFile, set after
fetchSegmentFile(), then delete it in closeCurrentFileLogRecords().
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -136,31 +188,60 @@ public Iterable<LogRecordBatch> fetch(long startOffset,
long localLogStartOffset
}
LOG.info(
- "Found {} remote log segments for table bucket {} from offset
{} to localLogStartOffset {}",
+ "Found {} remote log segments for table bucket {} from offset
{} to localLogStartOffset {} "
+ + "(prefetchNum={}, downloadThreads={})",
segments.size(),
tableBucket,
startOffset,
- localLogStartOffset);
+ localLogStartOffset,
+ prefetchNum,
+ ((java.util.concurrent.ThreadPoolExecutor)
downloadExecutor).getCorePoolSize());
RemoteLogBatchIterator iterator =
new RemoteLogBatchIterator(segments, startOffset,
localLogStartOffset);
this.activeIterator = iterator;
+ // Kick off the initial prefetch window eagerly so that the very first
advance() can
+ // consume from the ring instead of falling back to a synchronous
download.
+ iterator.fillPrefetchWindow();
return () -> iterator;
}
@Override
public void close() {
- // Close any active iterator to release file handles
- if (activeIterator != null) {
- activeIterator.close();
- activeIterator = null;
- }
- // Remove the entire "tmp" parent directory to clean up our
subdirectory as well
- // as any stale recovery directories left by a previous failed
recovery.
- Path tmpDir = tempDir.getParent();
- if (tmpDir != null && Files.exists(tmpDir)) {
- LOG.info("Cleaning up remote log recovery tmp dir: {}", tmpDir);
- deleteDirectoryQuietly(tmpDir.toFile());
+ try {
+ // Close any active iterator to release file handles
+ if (activeIterator != null) {
+ activeIterator.close();
+ activeIterator = null;
+ }
+ } finally {
+ downloadExecutor.shutdownNow();
+
+ boolean terminated = false;
+ try {
+ terminated =
+ downloadExecutor.awaitTermination(1,
java.util.concurrent.TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn(
+ "Interrupted while waiting for remote log fetcher
download executor termination for table bucket {}.",
+ tableBucket,
+ e);
+ }
+
+ if (terminated) {
Review Comment:
This cleanup path seems risky in two ways.
First, if awaitTermination() times out, we skip cleanup entirely, so one
uncooperative download can leave the recovery directory behind.
Second, tempDir.getParent() is the shared tmp/ directory under the log dir,
not just this fetcher's recovery directory. That is broader than what this
RemoteLogFetcher owns.
Could we delete tempDir itself, not its parent, and attempt that best-effort
regardless of whether the executor terminated? If deletion fails because a file
is still open, logging that is fine, but I don't think we should skip the
cleanup attempt.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -333,12 +555,202 @@ private void advance() {
}
}
+ /**
+ * Obtain the local file for {@code segment}, ideally reusing a
prefetched download. On
+ * success the ring head is consumed and a refill is triggered, so the
window stays full as
+ * long as more segments remain.
+ */
+ private File fetchSegmentFile(RemoteLogSegment segment) throws
IOException {
+ // Invariant: fillPrefetchWindow() and advance() walk the same
pre-filtered
+ // `segments` list in strict order with identical skip rules;
consumption is
+ // single-threaded FIFO; prefetchNum >= 1 (Math.max(1, ...) in the
ctor). So
+ // the ring head is *always* the segment advance() is asking for.
+ int headSlot = nextConsumeIndex % prefetchNum;
+ RemoteLogSegment headSegment = prefetchSegments[headSlot];
+ assert isSameSegmentId(segment, headSegment)
Review Comment:
This invariant is important enough that I don't think assert is the right
guard. Assertions are normally disabled in production.
Could we make this an explicit runtime check, drain the prefetch window, and
throw IOException with the requested/head segment ids?
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -189,16 +294,80 @@ private File downloadSegment(RemoteLogSegment segment)
throws IOException {
segment.remoteLogEndOffset(),
localFile);
+ boolean success = false;
try (InputStream inputStream = remoteLogStorage.fetchLogData(segment);
OutputStream outputStream =
Files.newOutputStream(localFile.toPath())) {
IOUtils.copyBytes(inputStream, outputStream, false);
+ success = true;
} catch (RemoteStorageException e) {
throw new IOException(
"Failed to download remote log segment: " +
segment.remoteLogSegmentId(), e);
+ } finally {
+ // Most remote/file InputStreams don't honor Thread.interrupt()
mid-read: the
+ // call simply returns a normally-completed copy and we'd fall
through with
+ // success=true while the worker has been interrupted by
close()/cancel(true).
+ // Treat "interrupt observed during the copy" as a failure for
cleanup purposes
+ // so we don't leave a stale segment file behind in tempDir.
+ if (!success || Thread.currentThread().isInterrupted()) {
Review Comment:
In downloadSegment(), if the copy succeeds but the thread has been
interrupted, the finally block deletes localFile but the method can still
return that File object.
Usually Future.cancel(true) masks this because the Future is cancelled, but
interrupt races can still make this path observable. Could we throw after
deleting on the interrupted path rather than returning a file that may no
longer exist?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]