Kaixuan-Duan commented on code in PR #3132:
URL: https://github.com/apache/fluss/pull/3132#discussion_r3224023215
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/RemoteLogFetcher.java:
##########
@@ -136,31 +188,60 @@ public Iterable<LogRecordBatch> fetch(long startOffset,
long localLogStartOffset
}
LOG.info(
- "Found {} remote log segments for table bucket {} from offset
{} to localLogStartOffset {}",
+ "Found {} remote log segments for table bucket {} from offset
{} to localLogStartOffset {} "
+ + "(prefetchNum={}, downloadThreads={})",
segments.size(),
tableBucket,
startOffset,
- localLogStartOffset);
+ localLogStartOffset,
+ prefetchNum,
+ ((java.util.concurrent.ThreadPoolExecutor)
downloadExecutor).getCorePoolSize());
RemoteLogBatchIterator iterator =
new RemoteLogBatchIterator(segments, startOffset,
localLogStartOffset);
this.activeIterator = iterator;
+ // Kick off the initial prefetch window eagerly so that the very first
advance() can
+ // consume from the ring instead of falling back to a synchronous
download.
+ iterator.fillPrefetchWindow();
return () -> iterator;
}
@Override
public void close() {
- // Close any active iterator to release file handles
- if (activeIterator != null) {
- activeIterator.close();
- activeIterator = null;
- }
- // Remove the entire "tmp" parent directory to clean up our
subdirectory as well
- // as any stale recovery directories left by a previous failed
recovery.
- Path tmpDir = tempDir.getParent();
- if (tmpDir != null && Files.exists(tmpDir)) {
- LOG.info("Cleaning up remote log recovery tmp dir: {}", tmpDir);
- deleteDirectoryQuietly(tmpDir.toFile());
+ try {
+ // Close any active iterator to release file handles
+ if (activeIterator != null) {
+ activeIterator.close();
+ activeIterator = null;
+ }
+ } finally {
+ downloadExecutor.shutdownNow();
+
+ boolean terminated = false;
+ try {
+ terminated =
+ downloadExecutor.awaitTermination(1,
java.util.concurrent.TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn(
+ "Interrupted while waiting for remote log fetcher
download executor termination for table bucket {}.",
+ tableBucket,
+ e);
+ }
+
+ if (terminated) {
Review Comment:
Good catch. Removed tempDir.getParent() deletion; now close() always
attempts best-effort cleanup of its own UUID subdirectory regardless of
executor termination, with a warning log if files are still in use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]