Kaixuan-Duan commented on code in PR #3132:
URL: https://github.com/apache/fluss/pull/3132#discussion_r3224017718
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -333,12 +555,202 @@ private void advance() {
}
}
+ /**
+ * Obtain the local file for {@code segment}, ideally reusing a
prefetched download. On
+ * success the ring head is consumed and a refill is triggered, so the
window stays full as
+ * long as more segments remain.
+ */
+ private File fetchSegmentFile(RemoteLogSegment segment) throws
IOException {
+ // Invariant: fillPrefetchWindow() and advance() walk the same
pre-filtered
+ // `segments` list in strict order with identical skip rules;
consumption is
+ // single-threaded FIFO; prefetchNum >= 1 (Math.max(1, ...) in the
ctor). So
+ // the ring head is *always* the segment advance() is asking for.
+ int headSlot = nextConsumeIndex % prefetchNum;
+ RemoteLogSegment headSegment = prefetchSegments[headSlot];
+ assert isSameSegmentId(segment, headSegment)
+ : "prefetch ring head "
+ + (headSegment == null ? "null" :
headSegment.remoteLogSegmentId())
+ + " does not match requested "
+ + segment.remoteLogSegmentId();
+ Future<File> headFuture = prefetchSlots[headSlot];
+ // Eagerly null the slot refs so the ring never keeps stale
references even if the
+ // consumer throws below — also releases the RemoteLogSegment for
GC early.
+ prefetchSlots[headSlot] = null;
+ prefetchSegments[headSlot] = null;
+ nextConsumeIndex++;
+ try {
+ return headFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Interrupted while waiting for remote log segment
download: "
+ + segment.remoteLogSegmentId(),
+ e);
+ } catch (CancellationException e) {
+ LOG.warn(
+ "Prefetched segment {} was cancelled, fallback to sync
download.",
+ segment.remoteLogSegmentId(),
+ e);
+ return downloadSegmentWithRetry(segment);
+ } catch (ExecutionException e) {
+ LOG.warn(
+ "Prefetched segment {} failed even after async
retries, "
+ + "fallback to one more synchronous retry
round.",
+ segment.remoteLogSegmentId(),
+ e.getCause());
+ return downloadSegmentWithRetry(segment);
+ } finally {
+ // Window slot released — try to submit the next prefetch
right away.
+ fillPrefetchWindow();
+ }
+ }
+
+ private boolean isSameSegmentId(RemoteLogSegment left,
RemoteLogSegment right) {
+ return left != null
+ && right != null
+ &&
left.remoteLogSegmentId().equals(right.remoteLogSegmentId());
+ }
+
+ /**
+ * Submit as many download tasks as fit in the window. Non-blocking:
if the window is full
+ * we stop and wait for the consumer to advance.
+ */
+ private void fillPrefetchWindow() {
+ if (closed) {
+ return;
+ }
+
+ while (nextPrefetchIndex < segments.size()) {
+ RemoteLogSegment segment = segments.get(nextPrefetchIndex);
+
+ // segment entirely before currentOffset — skip and advance
index.
+ if (segment.remoteLogEndOffset() < currentOffset) {
+ nextPrefetchIndex++;
+ continue;
+ }
+ // segment starts at or beyond localLogStartOffset — nothing
more to prefetch.
+ if (segment.remoteLogStartOffset() >= localLogStartOffset) {
+ return;
+ }
+
+ // Window full — back off and wait for the consumer to advance.
+ if (prefetchWindowSize() >= prefetchNum) {
+ return;
+ }
+
+ final RemoteLogSegment target = segment;
+ Future<File> future;
+ try {
+ // submit(Callable) returns a FutureTask whose
cancel(true) actually calls
+ // Thread.interrupt() on the worker, unlike
CompletableFuture.cancel which
+ // only flips state. This lets stale-fallback / inject /
second-fetch paths
+ // really stop in-flight downloads instead of orphaning
their files in tempDir.
+ future =
+ downloadExecutor.submit(
+ (Callable<File>) () ->
downloadSegmentWithRetry(target));
+ } catch (Throwable submitError) {
+ // The executor rejected the task (e.g. we're shutting
down); stop refilling.
+ // No slot has been claimed yet, so there is nothing to
roll back.
+ LOG.debug(
+ "Failed to submit prefetch for segment {}
(executor likely shutting down).",
+ target.remoteLogSegmentId(),
+ submitError);
+ return;
+ }
+
+ int slot = nextPrefetchIndex % prefetchNum;
+ prefetchSlots[slot] = future;
+ prefetchSegments[slot] = target;
+ nextPrefetchIndex++;
+ LOG.debug(
+ "Prefetching remote log segment {} for bucket {}
(window size={}, free slots={}).",
+ target.remoteLogSegmentId(),
+ tableBucket,
+ prefetchWindowSize(),
+ prefetchNum - prefetchWindowSize());
+ }
+ }
+
+ /**
+ * Cancel or clean up every entry currently in the window. Used on
close, iterator failure,
+ * and stale-ring fallback. After this runs the invariant {@code
nextConsumeIndex ==
+ * nextPrefetchIndex} holds, so the window is empty.
+ */
+ private void drainPrefetchWindow() {
+ while (nextConsumeIndex < nextPrefetchIndex) {
+ int slot = nextConsumeIndex % prefetchNum;
+ Future<File> future = prefetchSlots[slot];
+ prefetchSlots[slot] = null;
+ prefetchSegments[slot] = null;
+ nextConsumeIndex++;
+ if (future.isDone()) {
+ if (!cleanupCompletedFuture(future)) {
+ return;
+ }
+ } else {
+ if (!future.cancel(true) &&
!cleanupCompletedFuture(future)) {
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Drain a future that is (now) known to be completed: pull its file
out via {@link
+ * Future#get()} and delete it. {@code isDone()} guarantees {@code
get()} does not block.
+ *
+ * @return {@code true} on a clean drain, {@code false} if this thread
was interrupted while
+ * calling {@code get()} (caller should bail out and let close()'s
shutdownNow path
+ * reclaim the remaining slots).
+ */
+ private boolean cleanupCompletedFuture(Future<File> future) {
+ try {
+ // isDone() is true so get() will not block; it returns either
the
+ // successfully downloaded file, or throws
Cancellation/Execution.
+ cleanupUnusedPrefetchedFile(future.get());
+ } catch (CancellationException | ExecutionException ignored) {
+ // no local file to clean up
+ } catch (InterruptedException e) {
+ // A completed future never actually blocks here, so an
interrupt on this
+ // thread came from elsewhere. Restore the flag and bail out —
close()'s
+ // follow-up shutdownNow()/awaitTermination path will reclaim
whatever's
+ // left in the ring.
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }
+
+ private void cleanupUnusedPrefetchedFile(File prefetchedFile) {
+ if (prefetchedFile == null) {
+ return;
+ }
+
+ try {
+ Files.deleteIfExists(prefetchedFile.toPath());
+ } catch (IOException cleanupException) {
+ LOG.warn(
+ "Failed to cleanup unused prefetched segment file {}
for table bucket {}.",
+ prefetchedFile,
+ tableBucket,
+ cleanupException);
+ }
+ }
+
private void closeCurrentFileLogRecords() {
Review Comment:
Thanks for the suggestion. Added currentLocalFile field to track the
currently opened segment file; closeCurrentFileLogRecords() now deletes it
after closing FileLogRecords.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]