This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 076cbd6  chore: introduce prefetch to improve log poll performance 
(#103)
076cbd6 is described below

commit 076cbd62b0bf8646e4af330275cc23248ab39505
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Dec 21 15:22:05 2025 +0800

    chore: introduce prefetch to improve log poll performance (#103)
---
 crates/fluss/Cargo.toml                            |   1 +
 crates/fluss/src/client/credentials.rs             |  30 +-
 crates/fluss/src/client/table/log_fetch_buffer.rs  | 376 +++++++++++++
 crates/fluss/src/client/table/mod.rs               |   1 +
 crates/fluss/src/client/table/remote_log.rs        | 195 +++++--
 crates/fluss/src/client/table/scanner.rs           | 586 ++++++++++++++++-----
 crates/fluss/src/record/arrow.rs                   | 113 ++--
 crates/fluss/tests/integration/table.rs            |   6 +-
 .../fluss/tests/integration/table_remote_scan.rs   |   8 +-
 9 files changed, 1077 insertions(+), 239 deletions(-)

diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index cdba9de..27604ee 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -58,6 +58,7 @@ url = "2.5.7"
 uuid = { version = "1.10", features = ["v4"] }
 tempfile = "3.23.0"
 snafu = "0.8.3"
+scopeguard = "1.2.0"
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/client/credentials.rs 
b/crates/fluss/src/client/credentials.rs
index 6b07d08..8adfe48 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -90,20 +90,20 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> 
Option<(String, bool)> {
 
 pub struct CredentialsCache {
     inner: RwLock<Option<CachedToken>>,
+    rpc_client: Arc<RpcClient>,
+    metadata: Arc<Metadata>,
 }
 
 impl CredentialsCache {
-    pub fn new() -> Self {
+    pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
         Self {
             inner: RwLock::new(None),
+            rpc_client,
+            metadata,
         }
     }
 
-    pub async fn get_or_refresh(
-        &self,
-        rpc_client: &Arc<RpcClient>,
-        metadata: &Arc<Metadata>,
-    ) -> Result<HashMap<String, String>> {
+    pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
         {
             let guard = self.inner.read();
             if let Some(cached) = guard.as_ref() {
@@ -113,17 +113,13 @@ impl CredentialsCache {
             }
         }
 
-        self.refresh_from_server(rpc_client, metadata).await
+        self.refresh_from_server().await
     }
 
-    async fn refresh_from_server(
-        &self,
-        rpc_client: &Arc<RpcClient>,
-        metadata: &Arc<Metadata>,
-    ) -> Result<HashMap<String, String>> {
-        let cluster = metadata.get_cluster();
+    async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
+        let cluster = self.metadata.get_cluster();
         let server_node = cluster.get_one_available_server();
-        let conn = rpc_client.get_connection(server_node).await?;
+        let conn = self.rpc_client.get_connection(server_node).await?;
 
         let request = GetSecurityTokenRequest::new();
         let response = conn.request(request).await?;
@@ -158,9 +154,3 @@ impl CredentialsCache {
         Ok(props)
     }
 }
-
-impl Default for CredentialsCache {
-    fn default() -> Self {
-        Self::new()
-    }
-}
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
new file mode 100644
index 0000000..cee104e
--- /dev/null
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::metadata::TableBucket;
+use crate::record::{
+    LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, 
ScanRecord,
+};
+use parking_lot::Mutex;
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::Notify;
+
+/// Represents a completed fetch that can be consumed
+pub trait CompletedFetch: Send + Sync {
+    fn table_bucket(&self) -> &TableBucket;
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+    fn is_consumed(&self) -> bool;
+    fn drain(&mut self);
+    fn size_in_bytes(&self) -> usize;
+    fn high_watermark(&self) -> i64;
+    fn is_initialized(&self) -> bool;
+    fn set_initialized(&mut self);
+    fn next_fetch_offset(&self) -> i64;
+}
+
+/// Represents a pending fetch that is waiting to be completed
+pub trait PendingFetch: Send + Sync {
+    fn table_bucket(&self) -> &TableBucket;
+    fn is_completed(&self) -> bool;
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>>;
+}
+
+/// Thread-safe buffer for completed fetches
+pub struct LogFetchBuffer {
+    completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
+    pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn 
PendingFetch>>>>,
+    next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
+    not_empty_notify: Notify,
+    woken_up: Arc<AtomicBool>,
+}
+
+impl LogFetchBuffer {
+    pub fn new() -> Self {
+        Self {
+            completed_fetches: Mutex::new(VecDeque::new()),
+            pending_fetches: Mutex::new(HashMap::new()),
+            next_in_line_fetch: Mutex::new(None),
+            not_empty_notify: Notify::new(),
+            woken_up: Arc::new(AtomicBool::new(false)),
+        }
+    }
+
+    /// Check if the buffer is empty
+    pub fn is_empty(&self) -> bool {
+        self.completed_fetches.lock().is_empty()
+    }
+
+    /// Wait for the buffer to become non-empty, with timeout
+    /// Returns true if data became available, false if timeout
+    pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+        let deadline = std::time::Instant::now() + timeout;
+
+        loop {
+            // Check if buffer is not empty
+            if !self.is_empty() {
+                return true;
+            }
+
+            // Check if woken up
+            if self.woken_up.swap(false, Ordering::Acquire) {
+                return true;
+            }
+
+            // Check if timeout
+            let now = std::time::Instant::now();
+            if now >= deadline {
+                return false;
+            }
+
+            // Wait for notification with remaining time
+            let remaining = deadline - now;
+            let notified = self.not_empty_notify.notified();
+            tokio::select! {
+                _ = tokio::time::sleep(remaining) => {
+                    return false; // Timeout
+                }
+                _ = notified => {
+                    // Got notification, check again
+                    continue;
+                }
+            }
+        }
+    }
+
+    #[allow(dead_code)]
+    /// Wake up any waiting threads
+    pub fn wakeup(&self) {
+        self.woken_up.store(true, Ordering::Release);
+        self.not_empty_notify.notify_waiters();
+    }
+
+    /// Add a pending fetch to the buffer
+    pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) {
+        let table_bucket = pending_fetch.table_bucket().clone();
+        self.pending_fetches
+            .lock()
+            .entry(table_bucket)
+            .or_default()
+            .push_back(pending_fetch);
+    }
+
+    /// Try to complete pending fetches in order, converting them to completed 
fetches
+    pub fn try_complete(&self, table_bucket: &TableBucket) {
+        // Collect completed fetches while holding the pending_fetches lock,
+        // then push them to completed_fetches after releasing it to avoid
+        // holding both locks simultaneously.
+        let mut completed_to_push: Vec<Box<dyn CompletedFetch>> = Vec::new();
+        let mut has_completed = false;
+        {
+            let mut pending_map = self.pending_fetches.lock();
+            if let Some(pendings) = pending_map.get_mut(table_bucket) {
+                while let Some(front) = pendings.front() {
+                    if front.is_completed() {
+                        let pending = pendings.pop_front().unwrap();
+                        match pending.to_completed_fetch() {
+                            Ok(completed) => {
+                                completed_to_push.push(completed);
+                                has_completed = true;
+                            }
+                            Err(e) => {
+                                // todo: handle exception?
+                                log::error!("Error when completing: {e}");
+                            }
+                        }
+                    } else {
+                        break;
+                    }
+                }
+                if has_completed && pendings.is_empty() {
+                    pending_map.remove(table_bucket);
+                }
+            }
+        }
+
+        if !completed_to_push.is_empty() {
+            let mut completed_queue = self.completed_fetches.lock();
+            for completed in completed_to_push {
+                completed_queue.push_back(completed);
+            }
+        }
+
+        if has_completed {
+            // Signal that buffer is not empty
+            self.not_empty_notify.notify_waiters();
+        }
+    }
+
+    /// Add a completed fetch to the buffer
+    pub fn add(&self, completed_fetch: Box<dyn CompletedFetch>) {
+        let table_bucket = completed_fetch.table_bucket();
+        let mut pending_map = self.pending_fetches.lock();
+
+        if let Some(pendings) = pending_map.get_mut(table_bucket)
+            && !pendings.is_empty()
+        {
+            
pendings.push_back(Box::new(CompletedPendingFetch::new(completed_fetch)));
+            return;
+        }
+        // If there's no pending fetch for this table_bucket,
+        // directly add to completed_fetches
+        self.completed_fetches.lock().push_back(completed_fetch);
+        self.not_empty_notify.notify_waiters();
+    }
+
+    /// Poll the next completed fetch
+    pub fn poll(&self) -> Option<Box<dyn CompletedFetch>> {
+        self.completed_fetches.lock().pop_front()
+    }
+
+    /// Get the next in line fetch
+    pub fn next_in_line_fetch(&self) -> Option<Box<dyn CompletedFetch>> {
+        self.next_in_line_fetch.lock().take()
+    }
+
+    /// Set the next in line fetch
+    pub fn set_next_in_line_fetch(&self, fetch: Option<Box<dyn 
CompletedFetch>>) {
+        *self.next_in_line_fetch.lock() = fetch;
+    }
+
+    /// Get the set of buckets that have buffered data
+    pub fn buffered_buckets(&self) -> Vec<TableBucket> {
+        let mut buckets = Vec::new();
+
+        let next_in_line_fetch = self.next_in_line_fetch.lock();
+        if let Some(complete_fetch) = next_in_line_fetch.as_ref() {
+            if !complete_fetch.is_consumed() {
+                buckets.push(complete_fetch.table_bucket().clone());
+            }
+        }
+
+        let completed = self.completed_fetches.lock();
+        for fetch in completed.iter() {
+            buckets.push(fetch.table_bucket().clone());
+        }
+        let pending = self.pending_fetches.lock();
+        buckets.extend(pending.keys().cloned());
+        buckets
+    }
+}
+
+impl Default for LogFetchBuffer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// A wrapper that makes a completed fetch look like a pending fetch
+struct CompletedPendingFetch {
+    completed_fetch: Box<dyn CompletedFetch>,
+}
+
+impl CompletedPendingFetch {
+    fn new(completed_fetch: Box<dyn CompletedFetch>) -> Self {
+        Self { completed_fetch }
+    }
+}
+
+impl PendingFetch for CompletedPendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        self.completed_fetch.table_bucket()
+    }
+
+    fn is_completed(&self) -> bool {
+        true
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        Ok(self.completed_fetch)
+    }
+}
+
+/// Default implementation of CompletedFetch for in-memory log records
+pub struct DefaultCompletedFetch {
+    table_bucket: TableBucket,
+    log_record_batch: LogRecordsBatches,
+    read_context: ReadContext,
+    next_fetch_offset: i64,
+    high_watermark: i64,
+    size_in_bytes: usize,
+    consumed: bool,
+    initialized: bool,
+    records_read: usize,
+    current_record_iterator: Option<LogRecordIterator>,
+    current_record_batch: Option<LogRecordBatch>,
+}
+
+impl DefaultCompletedFetch {
+    pub fn new(
+        table_bucket: TableBucket,
+        log_record_batch: LogRecordsBatches,
+        size_in_bytes: usize,
+        read_context: ReadContext,
+        fetch_offset: i64,
+        high_watermark: i64,
+    ) -> Result<Self> {
+        Ok(Self {
+            table_bucket,
+            log_record_batch,
+            read_context,
+            next_fetch_offset: fetch_offset,
+            high_watermark,
+            size_in_bytes,
+            consumed: false,
+            initialized: false,
+            records_read: 0,
+            current_record_iterator: None,
+            current_record_batch: None,
+        })
+    }
+
+    /// Get the next fetched record, handling batch iteration and record 
skipping
+    fn next_fetched_record(&mut self) -> Result<Option<ScanRecord>> {
+        loop {
+            if let Some(record) = self
+                .current_record_iterator
+                .as_mut()
+                .and_then(Iterator::next)
+            {
+                if record.offset() >= self.next_fetch_offset {
+                    return Ok(Some(record));
+                }
+            } else if let Some(batch) = self.log_record_batch.next() {
+                self.current_record_iterator = 
Some(batch.records(&self.read_context)?);
+                self.current_record_batch = Some(batch);
+            } else {
+                if let Some(batch) = self.current_record_batch.take() {
+                    self.next_fetch_offset = batch.next_log_offset();
+                }
+                self.drain();
+                return Ok(None);
+            }
+        }
+    }
+}
+
+impl CompletedFetch for DefaultCompletedFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.table_bucket
+    }
+
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
+        // todo: handle corrupt_last_record
+        if self.consumed {
+            return Ok(Vec::new());
+        }
+
+        let mut scan_records = Vec::new();
+
+        for _ in 0..max_records {
+            if let Some(record) = self.next_fetched_record()? {
+                self.next_fetch_offset = record.offset() + 1;
+                self.records_read += 1;
+                scan_records.push(record);
+            } else {
+                break;
+            }
+        }
+
+        Ok(scan_records)
+    }
+
+    fn is_consumed(&self) -> bool {
+        self.consumed
+    }
+
+    fn drain(&mut self) {
+        self.consumed = true;
+    }
+
+    fn size_in_bytes(&self) -> usize {
+        self.size_in_bytes
+    }
+
+    fn high_watermark(&self) -> i64 {
+        self.high_watermark
+    }
+
+    fn is_initialized(&self) -> bool {
+        self.initialized
+    }
+
+    fn set_initialized(&mut self) {
+        self.initialized = true;
+    }
+
+    fn next_fetch_offset(&self) -> i64 {
+        self.next_fetch_offset
+    }
+}
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 9972247..e2cf9e6 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;
 
 mod append;
 
+mod log_fetch_buffer;
 mod remote_log;
 mod scanner;
 mod writer;
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index 10273dd..d9abd19 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -14,16 +14,18 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use crate::client::table::log_fetch_buffer::{CompletedFetch, 
DefaultCompletedFetch, PendingFetch};
 use crate::error::{Error, Result};
 use crate::io::{FileIO, Storage};
 use crate::metadata::TableBucket;
 use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
-use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
+use crate::record::{LogRecordsBatches, ReadContext};
 use crate::util::delete_file;
-use parking_lot::RwLock;
+use parking_lot::{Mutex, RwLock};
 use std::collections::HashMap;
 use std::io;
 use std::path::{Path, PathBuf};
+use std::sync::Arc;
 use tempfile::TempDir;
 use tokio::io::AsyncWriteExt;
 use tokio::sync::oneshot;
@@ -70,45 +72,121 @@ pub struct RemoteLogFetchInfo {
 }
 
 impl RemoteLogFetchInfo {
-    pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) 
-> Result<Self> {
+    pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) 
-> Self {
         let segments = info
             .remote_log_segments
             .iter()
             .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
             .collect();
 
-        Ok(Self {
+        Self {
             remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
             partition_name: info.partition_name.clone(),
             remote_log_segments: segments,
             first_start_pos: info.first_start_pos.unwrap_or(0),
-        })
+        }
     }
 }
 
+type CompletionCallback = Box<dyn Fn() + Send + Sync>;
+
 /// Future for a remote log download request
 pub struct RemoteLogDownloadFuture {
-    receiver: Option<oneshot::Receiver<Result<PathBuf>>>,
+    result: Arc<Mutex<Option<Result<Vec<u8>>>>>,
+    completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
+    // todo: add recycleCallback
 }
 
 impl RemoteLogDownloadFuture {
-    pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self {
+    pub fn new(receiver: oneshot::Receiver<Result<Vec<u8>>>) -> Self {
+        let result = Arc::new(Mutex::new(None));
+        let result_clone = Arc::clone(&result);
+        let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> =
+            Arc::new(Mutex::new(Vec::new()));
+        let callbacks_clone = Arc::clone(&completion_callbacks);
+
+        // Spawn a task to wait for the download and update result, then call 
callbacks
+        tokio::spawn(async move {
+            let download_result = match receiver.await {
+                Ok(Ok(path)) => Ok(path),
+                Ok(Err(e)) => Err(e),
+                Err(e) => Err(Error::UnexpectedError {
+                    message: format!("Download & Read future cancelled: 
{e:?}"),
+                    source: None,
+                }),
+            };
+
+            *result_clone.lock() = Some(download_result);
+
+            // Call all registered callbacks
+            // We need to take the callbacks to avoid holding the lock while 
calling them
+            // This also ensures that any callbacks registered after this 
point will be called immediately
+            let callbacks: Vec<CompletionCallback> = {
+                let mut callbacks_guard = callbacks_clone.lock();
+                std::mem::take(&mut *callbacks_guard)
+            };
+            for callback in callbacks {
+                callback();
+            }
+
+            // After calling callbacks, any new callbacks registered will see 
is_done() == true
+            // and will be called immediately in on_complete()
+        });
+
         Self {
-            receiver: Some(receiver),
+            result,
+            completion_callbacks,
         }
     }
 
-    /// Get the downloaded file path
-    pub async fn get_file_path(&mut self) -> Result<PathBuf> {
-        let receiver = self.receiver.take().ok_or_else(|| 
Error::UnexpectedError {
-            message: "Downloaded file already consumed".to_string(),
-            source: None,
-        })?;
-
-        receiver.await.map_err(|e| Error::UnexpectedError {
-            message: format!("Download future cancelled: {e:?}"),
-            source: None,
-        })?
+    /// Register a callback to be called when download completes (similar to 
Java's onComplete)
+    pub fn on_complete<F>(&self, callback: F)
+    where
+        F: Fn() + Send + Sync + 'static,
+    {
+        // Acquire callbacks lock first to ensure atomicity of the 
check-and-register operation
+        let mut callbacks_guard = self.completion_callbacks.lock();
+
+        // Check completion status while holding the callbacks lock.
+        // This ensures that:
+        // 1. If the task completes between checking is_done() and registering 
the callback,
+        //    we'll see the completion state correctly
+        // 2. The background task cannot clear the callbacks list while we're 
checking/registering
+        let is_done = self.result.lock().is_some();
+
+        if is_done {
+            // If already completed, call immediately (drop lock first to 
avoid deadlock)
+            drop(callbacks_guard);
+            callback();
+        } else {
+            // Register the callback while holding the callbacks lock.
+            // This ensures that even if the background task completes right 
after we check
+            // is_done(), it will wait for us to release the lock before 
taking callbacks.
+            // When it does take callbacks, it will see our callback in the 
list and execute it.
+            callbacks_guard.push(Box::new(callback));
+            // Lock is automatically released here
+        }
+    }
+
+    pub fn is_done(&self) -> bool {
+        self.result.lock().is_some()
+    }
+
+    /// Get the downloaded file path (synchronous, only works after is_done() 
returns true)
+    pub fn get_remote_log_bytes(&self) -> Result<Vec<u8>> {
+        // todo: handle download fail
+        let guard = self.result.lock();
+        match guard.as_ref() {
+            Some(Ok(path)) => Ok(path.clone()),
+            Some(Err(e)) => Err(Error::IoUnexpectedError {
+                message: format!("Fail to get remote log bytes: {e}"),
+                source: io::Error::other(format!("{e:?}")),
+            }),
+            None => Err(Error::IoUnexpectedError {
+                message: "Get remote log bytes not completed yet".to_string(),
+                source: io::Error::other("Get remote log bytes not completed 
yet"),
+            }),
+        }
     }
 }
 
@@ -135,25 +213,38 @@ impl RemoteLogDownloader {
         &self,
         remote_log_tablet_dir: &str,
         segment: &RemoteLogSegment,
-    ) -> Result<RemoteLogDownloadFuture> {
+    ) -> RemoteLogDownloadFuture {
         let (sender, receiver) = oneshot::channel();
         let local_file_name = segment.local_file_name();
         let local_file_path = self.local_log_dir.path().join(&local_file_name);
         let remote_path = self.build_remote_path(remote_log_tablet_dir, 
segment);
         let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
         let remote_fs_props = self.remote_fs_props.read().clone();
-        // Spawn async download task
+        // Spawn async download & read task
         tokio::spawn(async move {
-            let result = Self::download_file(
-                &remote_log_tablet_dir,
-                &remote_path,
-                &local_file_path,
-                &remote_fs_props,
-            )
+            let result = async {
+                let file_path = Self::download_file(
+                    &remote_log_tablet_dir,
+                    &remote_path,
+                    &local_file_path,
+                    &remote_fs_props,
+                )
+                .await?;
+                let bytes = tokio::fs::read(&file_path).await?;
+
+                // Delete the downloaded local file to free disk (async, but 
we'll do it in background)
+                let file_path_clone = file_path.clone();
+                tokio::spawn(async move {
+                    let _ = delete_file(file_path_clone).await;
+                });
+
+                Ok(bytes)
+            }
             .await;
+
             let _ = sender.send(result);
         });
-        Ok(RemoteLogDownloadFuture::new(receiver))
+        RemoteLogDownloadFuture::new(receiver)
     }
 
     /// Build the remote path for a log segment
@@ -261,9 +352,7 @@ pub struct RemotePendingFetch {
     segment: RemoteLogSegment,
     download_future: RemoteLogDownloadFuture,
     pos_in_log_segment: i32,
-    #[allow(dead_code)]
     fetch_offset: i64,
-    #[allow(dead_code)]
     high_watermark: i64,
     read_context: ReadContext,
 }
@@ -286,32 +375,42 @@ impl RemotePendingFetch {
             read_context,
         }
     }
+}
+
+impl PendingFetch for RemotePendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.segment.table_bucket
+    }
 
-    /// Convert to completed fetch by reading the downloaded file
-    pub async fn convert_to_completed_fetch(
-        mut self,
-    ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
-        let file_path = self.download_future.get_file_path().await?;
-        let file_data = tokio::fs::read(&file_path).await?;
+    fn is_completed(&self) -> bool {
+        self.download_future.is_done()
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        // Get the file path (this should only be called when is_completed() 
returns true)
+        let mut data = self.download_future.get_remote_log_bytes()?;
 
         // Slice the data if needed
         let data = if self.pos_in_log_segment > 0 {
-            &file_data[self.pos_in_log_segment as usize..]
+            data.split_off(self.pos_in_log_segment as usize)
         } else {
-            &file_data
+            data
         };
 
-        // delete the downloaded local file to free disk
-        delete_file(file_path).await;
+        let size_in_bytes = data.len();
 
-        // Parse log records (remote log contains full data, need client-side 
projection)
-        let mut fetch_records = vec![];
-        for log_record in &mut LogRecordsBatchs::new(data) {
-            
fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?);
-        }
+        let log_record_batch = LogRecordsBatches::new(data);
+
+        // Create DefaultCompletedFetch from the data
+        let completed_fetch = DefaultCompletedFetch::new(
+            self.segment.table_bucket,
+            log_record_batch,
+            size_in_bytes,
+            self.read_context,
+            self.fetch_offset,
+            self.high_watermark,
+        )?;
 
-        let mut result = HashMap::new();
-        result.insert(self.segment.table_bucket.clone(), fetch_records);
-        Ok(result)
+        Ok(Box::new(completed_fetch))
     }
 }
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index a9384d9..2246e2c 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -18,24 +18,27 @@
 use crate::client::connection::FlussConnection;
 use crate::client::credentials::CredentialsCache;
 use crate::client::metadata::Metadata;
+use crate::client::table::log_fetch_buffer::{
+    CompletedFetch, DefaultCompletedFetch, LogFetchBuffer,
+};
+use crate::client::table::remote_log::{
+    RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
+};
 use crate::error::{Error, Result};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
-use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
-use crate::rpc::RpcClient;
+use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
+use crate::rpc::{RpcClient, message};
 use crate::util::FairBucketStatusMap;
 use arrow_schema::SchemaRef;
-use parking_lot::RwLock;
-use std::collections::HashMap;
+use log::{debug, error, warn};
+use parking_lot::{Mutex, RwLock};
+use std::collections::{HashMap, HashSet};
 use std::slice::from_ref;
 use std::sync::Arc;
 use std::time::Duration;
 use tempfile::TempDir;
 
-use crate::client::table::remote_log::{
-    RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
-};
-
 const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
 #[allow(dead_code)]
 const LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024;
@@ -171,8 +174,43 @@ impl LogScanner {
         })
     }
 
-    pub async fn poll(&self, _timeout: Duration) -> Result<ScanRecords> {
-        Ok(ScanRecords::new(self.poll_for_fetches().await?))
+    pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+        let start = std::time::Instant::now();
+        let deadline = start + timeout;
+
+        loop {
+            // Try to collect fetches
+            let fetch_result = self.poll_for_fetches().await?;
+
+            if !fetch_result.is_empty() {
+                // We have data, send next round of fetches and return
+                // This enables pipelining while user processes the data
+                self.log_fetcher.send_fetches().await?;
+                return Ok(ScanRecords::new(fetch_result));
+            }
+
+            // No data available, check if we should wait
+            let now = std::time::Instant::now();
+            if now >= deadline {
+                // Timeout reached, return empty result
+                return Ok(ScanRecords::new(HashMap::new()));
+            }
+
+            // Wait for buffer to become non-empty with remaining time
+            let remaining = deadline - now;
+            let has_data = self
+                .log_fetcher
+                .log_fetch_buffer
+                .await_not_empty(remaining)
+                .await;
+
+            if !has_data {
+                // Timeout while waiting
+                return Ok(ScanRecords::new(HashMap::new()));
+            }
+
+            // Buffer became non-empty, try again
+        }
     }
 
     pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
@@ -208,20 +246,31 @@ impl LogScanner {
     }
 
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
-        self.log_fetcher.send_fetches_and_collect().await
+        let result = self.log_fetcher.collect_fetches()?;
+        if !result.is_empty() {
+            return Ok(result);
+        }
+
+        // send any new fetches (won't resend pending fetches).
+        self.log_fetcher.send_fetches().await?;
+
+        // Collect completed fetches from buffer
+        self.log_fetcher.collect_fetches()
     }
 }
 
-#[allow(dead_code)]
 struct LogFetcher {
-    table_path: TablePath,
     conns: Arc<RpcClient>,
-    table_info: TableInfo,
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
     read_context: ReadContext,
-    remote_log_downloader: RemoteLogDownloader,
-    credentials_cache: CredentialsCache,
+    remote_read_context: ReadContext,
+    remote_log_downloader: Arc<RemoteLogDownloader>,
+    // todo: consider schedule a background thread to update
+    // token instead of update in fetch phase
+    credentials_cache: Arc<CredentialsCache>,
+    log_fetch_buffer: Arc<LogFetchBuffer>,
+    nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
 }
 
 impl LogFetcher {
@@ -233,126 +282,306 @@ impl LogFetcher {
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
         let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
-        let read_context = Self::create_read_context(full_arrow_schema, 
projected_fields.clone());
+        let read_context =
+            Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false);
+        let remote_read_context =
+            Self::create_read_context(full_arrow_schema, 
projected_fields.clone(), true);
 
         let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
 
         Ok(LogFetcher {
-            table_path: table_info.table_path.clone(),
-            conns,
-            table_info,
-            metadata,
+            conns: conns.clone(),
+            metadata: metadata.clone(),
             log_scanner_status,
             read_context,
-            remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
-            credentials_cache: CredentialsCache::new(),
+            remote_read_context,
+            remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+            credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
+            log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+            nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
         })
     }
 
     fn create_read_context(
         full_arrow_schema: SchemaRef,
         projected_fields: Option<Vec<usize>>,
+        is_from_remote: bool,
     ) -> ReadContext {
         match projected_fields {
-            None => ReadContext::new(full_arrow_schema),
-            Some(fields) => 
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+            None => ReadContext::new(full_arrow_schema, is_from_remote),
+            Some(fields) => {
+                ReadContext::with_projection_pushdown(full_arrow_schema, 
fields, is_from_remote)
+            }
         }
     }
 
-    async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
+    /// Send fetch requests asynchronously without waiting for responses
+    async fn send_fetches(&self) -> Result<()> {
+        // todo: check update metadata like fluss-java in case leader changes
         let fetch_request = self.prepare_fetch_log_requests().await;
-        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
         for (leader, fetch_request) in fetch_request {
-            let cluster = self.metadata.get_cluster();
-            let server_node = cluster
-                .get_tablet_server(leader)
-                .expect("todo: handle leader not exist.");
-            let con = self.conns.get_connection(server_node).await?;
-
-            let fetch_response = con
-                
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
-                .await?;
-
-            for pb_fetch_log_resp in fetch_response.tables_resp {
-                let table_id = pb_fetch_log_resp.table_id;
-                let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
-                for fetch_log_for_bucket in fetch_log_for_buckets {
-                    let bucket: i32 = fetch_log_for_bucket.bucket_id;
-                    let table_bucket = TableBucket::new(table_id, bucket);
-
-                    // Check if this is a remote log fetch
-                    if let Some(ref remote_log_fetch_info) =
-                        fetch_log_for_bucket.remote_log_fetch_info
-                    {
-                        let remote_fs_props = self
-                            .credentials_cache
-                            .get_or_refresh(&self.conns, &self.metadata)
-                            .await?;
-                        self.remote_log_downloader
-                            .set_remote_fs_props(remote_fs_props);
-                        let remote_fetch_info = RemoteLogFetchInfo::from_proto(
-                            remote_log_fetch_info,
-                            table_bucket.clone(),
-                        )?;
-
-                        if let Some(fetch_offset) =
-                            
self.log_scanner_status.get_bucket_offset(&table_bucket)
-                        {
-                            let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
-                            // Download and process remote log segments
-                            let mut pos_in_log_segment = 
remote_fetch_info.first_start_pos;
-                            let mut current_fetch_offset = fetch_offset;
-                            // todo: make segment download in parallel
-                            for (i, segment) in
-                                
remote_fetch_info.remote_log_segments.iter().enumerate()
-                            {
-                                if i > 0 {
-                                    pos_in_log_segment = 0;
-                                    current_fetch_offset = 
segment.start_offset;
-                                }
+            debug!("Adding pending request for node id {leader}");
+            // Check if we already have a pending request for this node
+            {
+                self.nodes_with_pending_fetch_requests.lock().insert(leader);
+            }
+
+            let cluster = self.metadata.get_cluster().clone();
+
+            let conns = Arc::clone(&self.conns);
+            let log_fetch_buffer = self.log_fetch_buffer.clone();
+            let log_scanner_status = self.log_scanner_status.clone();
+            let read_context = self.read_context.clone();
+            let remote_read_context = self.remote_read_context.clone();
+            let remote_log_downloader = 
Arc::clone(&self.remote_log_downloader);
+            let creds_cache = self.credentials_cache.clone();
+            let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
+
+            // Spawn async task to handle the fetch request
+            // Note: These tasks are not explicitly tracked or cancelled when 
LogFetcher is dropped.
+            // This is acceptable because:
+            // 1. Tasks will naturally complete (network requests will return 
or timeout)
+            // 2. Tasks use Arc references, so resources are properly shared
+            // 3. When the program exits, tokio runtime will clean up all tasks
+            // 4. Tasks are short-lived (network I/O operations)
+            tokio::spawn(async move {
+                // make sure it will always remove leader from pending nodes
+                let _guard = scopeguard::guard((), |_| {
+                    nodes_with_pending.lock().remove(&leader);
+                });
+
+                let server_node = cluster
+                    .get_tablet_server(leader)
+                    .expect("todo: handle leader not exist.");
+
+                let con = match conns.get_connection(server_node).await {
+                    Ok(con) => con,
+                    Err(e) => {
+                        // todo: handle failed to get connection
+                        warn!("Failed to get connection to destination node: 
{e:?}");
+                        return;
+                    }
+                };
+
+                let fetch_response = match con
+                    .request(message::FetchLogRequest::new(fetch_request))
+                    .await
+                {
+                    Ok(resp) => resp,
+                    Err(e) => {
+                        // todo: handle fetch log from destination node
+                        warn!("Failed to fetch log from destination node 
{server_node:?}: {e:?}");
+                        return;
+                    }
+                };
+
+                if let Err(e) = Self::handle_fetch_response(
+                    fetch_response,
+                    &log_fetch_buffer,
+                    &log_scanner_status,
+                    &read_context,
+                    &remote_read_context,
+                    &remote_log_downloader,
+                    &creds_cache,
+                )
+                .await
+                {
+                    // todo: handle fail to handle fetch response
+                    error!("Fail to handle fetch response: {e:?}");
+                }
+            });
+        }
+
+        Ok(())
+    }
 
-                                let download_future =
-                                    
self.remote_log_downloader.request_remote_log(
-                                        
&remote_fetch_info.remote_log_tablet_dir,
-                                        segment,
-                                    )?;
-                                let pending_fetch = RemotePendingFetch::new(
-                                    segment.clone(),
-                                    download_future,
-                                    pos_in_log_segment,
-                                    current_fetch_offset,
-                                    high_watermark,
-                                    self.read_context.clone(),
-                                );
-                                let remote_records =
-                                    
pending_fetch.convert_to_completed_fetch().await?;
-                                // Update offset and merge results
-                                for (tb, records) in remote_records {
-                                    if let Some(last_record) = records.last() {
-                                        self.log_scanner_status
-                                            .update_offset(&tb, 
last_record.offset() + 1);
-                                    }
-                                    
result.entry(tb).or_default().extend(records);
+    /// Handle fetch response and add completed fetches to buffer
+    async fn handle_fetch_response(
+        fetch_response: crate::proto::FetchLogResponse,
+        log_fetch_buffer: &Arc<LogFetchBuffer>,
+        log_scanner_status: &Arc<LogScannerStatus>,
+        read_context: &ReadContext,
+        remote_read_context: &ReadContext,
+        remote_log_downloader: &Arc<RemoteLogDownloader>,
+        credentials_cache: &Arc<CredentialsCache>,
+    ) -> Result<()> {
+        for pb_fetch_log_resp in fetch_response.tables_resp {
+            let table_id = pb_fetch_log_resp.table_id;
+            let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
+
+            for fetch_log_for_bucket in fetch_log_for_buckets {
+                let bucket: i32 = fetch_log_for_bucket.bucket_id;
+                let table_bucket = TableBucket::new(table_id, bucket);
+
+                // todo: check fetch result code for per-bucket
+                let Some(fetch_offset) = 
log_scanner_status.get_bucket_offset(&table_bucket) else {
+                    debug!(
+                        "Ignoring fetch log response for bucket {table_bucket} 
because the bucket has been unsubscribed."
+                    );
+                    continue;
+                };
+
+                // Check if this is a remote log fetch
+                if let Some(ref remote_log_fetch_info) = 
fetch_log_for_bucket.remote_log_fetch_info
+                {
+                    // set remote fs props
+                    let remote_fs_props = 
credentials_cache.get_or_refresh().await?;
+                    remote_log_downloader.set_remote_fs_props(remote_fs_props);
+
+                    let remote_fetch_info =
+                        RemoteLogFetchInfo::from_proto(remote_log_fetch_info, 
table_bucket.clone());
+
+                    let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+                    Self::pending_remote_fetches(
+                        remote_log_downloader.clone(),
+                        log_fetch_buffer.clone(),
+                        remote_read_context.clone(),
+                        &table_bucket,
+                        remote_fetch_info,
+                        fetch_offset,
+                        high_watermark,
+                    );
+                } else if fetch_log_for_bucket.records.is_some() {
+                    // Handle regular in-memory records - create completed 
fetch directly
+                    let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+                    let records = 
fetch_log_for_bucket.records.unwrap_or(vec![]);
+                    let size_in_bytes = records.len();
+                    let log_record_batch = LogRecordsBatches::new(records);
+
+                    match DefaultCompletedFetch::new(
+                        table_bucket.clone(),
+                        log_record_batch,
+                        size_in_bytes,
+                        read_context.clone(),
+                        fetch_offset,
+                        high_watermark,
+                    ) {
+                        Ok(completed_fetch) => {
+                            log_fetch_buffer.add(Box::new(completed_fetch));
+                        }
+                        Err(e) => {
+                            // todo: handle error
+                            log::warn!("Failed to create completed fetch: 
{e:?}");
+                        }
+                    }
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn pending_remote_fetches(
+        remote_log_downloader: Arc<RemoteLogDownloader>,
+        log_fetch_buffer: Arc<LogFetchBuffer>,
+        read_context: ReadContext,
+        table_bucket: &TableBucket,
+        remote_fetch_info: RemoteLogFetchInfo,
+        fetch_offset: i64,
+        high_watermark: i64,
+    ) {
+        // Download and process remote log segments
+        let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
+        let mut current_fetch_offset = fetch_offset;
+        for (i, segment) in 
remote_fetch_info.remote_log_segments.iter().enumerate() {
+            if i > 0 {
+                pos_in_log_segment = 0;
+                current_fetch_offset = segment.start_offset;
+            }
+
+            // todo:
+            // 1: control the max threads to download remote segment
+            // 2: introduce priority queue to priority highest for earliest 
segment
+            let download_future = remote_log_downloader
+                .request_remote_log(&remote_fetch_info.remote_log_tablet_dir, 
segment);
+
+            // Register callback to be called when download completes
+            // (similar to Java's downloadFuture.onComplete)
+            // This must be done before creating RemotePendingFetch to avoid 
move issues
+            let table_bucket = table_bucket.clone();
+            let log_fetch_buffer_clone = log_fetch_buffer.clone();
+            download_future.on_complete(move || {
+                log_fetch_buffer_clone.try_complete(&table_bucket);
+            });
+
+            let pending_fetch = RemotePendingFetch::new(
+                segment.clone(),
+                download_future,
+                pos_in_log_segment,
+                current_fetch_offset,
+                high_watermark,
+                read_context.clone(),
+            );
+            // Add to pending fetches in buffer (similar to Java's 
logFetchBuffer.pend)
+            log_fetch_buffer.pend(Box::new(pending_fetch));
+        }
+    }
+
+    /// Collect completed fetches from buffer
+    /// Reference: LogFetchCollector.collectFetch in Java
+    fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> 
{
+        const MAX_POLL_RECORDS: usize = 500; // Default max poll records
+        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+        let mut records_remaining = MAX_POLL_RECORDS;
+
+        while records_remaining > 0 {
+            // Get the next in line fetch, or get a new one from buffer
+            let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+            if next_in_line.is_none() || 
next_in_line.as_ref().unwrap().is_consumed() {
+                // Get a new fetch from buffer
+                if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
+                    // Initialize the fetch if not already initialized
+                    if !completed_fetch.is_initialized() {
+                        let size_in_bytes = completed_fetch.size_in_bytes();
+                        match self.initialize_fetch(completed_fetch) {
+                            Ok(initialized) => {
+                                
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+                                continue;
+                            }
+                            Err(e) => {
+                                // Remove a completedFetch upon a parse with 
exception if
+                                // (1) it contains no records, and
+                                // (2) there are no fetched records with 
actual content preceding this
+                                // exception.
+                                if result.is_empty() && size_in_bytes == 0 {
+                                    // todo: do we need to consider it like 
java ?
+                                    // self.log_fetch_buffer.poll();
                                 }
+                                return Err(e);
                             }
-                        } else {
-                            // if the offset is null, it means the bucket has 
been unsubscribed,
-                            // skip processing and continue to the next bucket.
-                            continue;
                         }
-                    } else if fetch_log_for_bucket.records.is_some() {
-                        // Handle regular in-memory records
-                        let mut fetch_records = vec![];
-                        let data = fetch_log_for_bucket.records.unwrap();
-                        for log_record in &mut LogRecordsBatchs::new(&data) {
-                            let last_offset = log_record.last_log_offset();
-                            
fetch_records.extend(log_record.records(&self.read_context)?);
-                            self.log_scanner_status
-                                .update_offset(&table_bucket, last_offset + 1);
-                        }
-                        result.insert(table_bucket, fetch_records);
+                    } else {
+                        self.log_fetch_buffer
+                            .set_next_in_line_fetch(Some(completed_fetch));
+                    }
+                    // Note: poll() already removed the fetch from buffer, so 
no need to call poll()
+                } else {
+                    // No more fetches available
+                    break;
+                }
+            } else {
+                // Fetch records from next_in_line
+                if let Some(mut next_fetch) = next_in_line {
+                    let records =
+                        self.fetch_records_from_fetch(&mut next_fetch, 
records_remaining)?;
+
+                    if !records.is_empty() {
+                        let table_bucket = next_fetch.table_bucket().clone();
+                        // Merge with existing records for this bucket
+                        let existing = result.entry(table_bucket).or_default();
+                        let records_count = records.len();
+                        existing.extend(records);
+
+                        records_remaining = 
records_remaining.saturating_sub(records_count);
                     }
+
+                    // If the fetch is not fully consumed, put it back for the 
next round
+                    if !next_fetch.is_consumed() {
+                        self.log_fetch_buffer
+                            .set_next_in_line_fetch(Some(next_fetch));
+                    }
+                    // If consumed, next_fetch will be dropped here (which is 
correct)
                 }
             }
         }
@@ -360,6 +589,83 @@ impl LogFetcher {
         Ok(result)
     }
 
+    /// Initialize a completed fetch, checking offset match and updating high 
watermark
+    fn initialize_fetch(
+        &self,
+        mut completed_fetch: Box<dyn CompletedFetch>,
+    ) -> Result<Option<Box<dyn CompletedFetch>>> {
+        // todo: handle error in initialize fetch
+        let table_bucket = completed_fetch.table_bucket();
+        let fetch_offset = completed_fetch.next_fetch_offset();
+
+        // Check if bucket is still subscribed
+        let Some(current_offset) = 
self.log_scanner_status.get_bucket_offset(table_bucket) else {
+            warn!(
+                "Discarding stale fetch response for bucket {table_bucket:?} 
since the bucket has been unsubscribed"
+            );
+            return Ok(None);
+        };
+
+        // Check if offset matches
+        if fetch_offset != current_offset {
+            warn!(
+                "Discarding stale fetch response for bucket {table_bucket:?} 
since its offset {fetch_offset} does not match the expected offset 
{current_offset}"
+            );
+            return Ok(None);
+        }
+
+        // Update high watermark
+        let high_watermark = completed_fetch.high_watermark();
+        if high_watermark >= 0 {
+            self.log_scanner_status
+                .update_high_watermark(table_bucket, high_watermark);
+        }
+
+        completed_fetch.set_initialized();
+        Ok(Some(completed_fetch))
+    }
+
+    /// Fetch records from a completed fetch, checking offset match
+    fn fetch_records_from_fetch(
+        &self,
+        next_in_line_fetch: &mut Box<dyn CompletedFetch>,
+        max_records: usize,
+    ) -> Result<Vec<ScanRecord>> {
+        let table_bucket = next_in_line_fetch.table_bucket().clone();
+        let current_offset = 
self.log_scanner_status.get_bucket_offset(&table_bucket);
+
+        if current_offset.is_none() {
+            warn!(
+                "Ignoring fetched records for {table_bucket:?} since the 
bucket has been unsubscribed"
+            );
+            next_in_line_fetch.drain();
+            return Ok(Vec::new());
+        }
+
+        let current_offset = current_offset.unwrap();
+        let fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+        // Check if this fetch is next in line
+        if fetch_offset == current_offset {
+            let records = next_in_line_fetch.fetch_records(max_records)?;
+            let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+            if next_fetch_offset > current_offset {
+                self.log_scanner_status
+                    .update_offset(&table_bucket, next_fetch_offset);
+            }
+
+            Ok(records)
+        } else {
+            // These records aren't next in line, ignore them
+            warn!(
+                "Ignoring fetched records for {table_bucket:?} at offset 
{fetch_offset} since the current offset is {current_offset}"
+            );
+            next_in_line_fetch.drain();
+            Ok(Vec::new())
+        }
+    }
+
     async fn prepare_fetch_log_requests(&self) -> HashMap<i32, 
FetchLogRequest> {
         let mut fetch_log_req_for_buckets = HashMap::new();
         let mut table_id = None;
@@ -372,25 +678,44 @@ impl LogFetcher {
             let offset = match 
self.log_scanner_status.get_bucket_offset(&bucket) {
                 Some(offset) => offset,
                 None => {
-                    // todo: debug
+                    debug!(
+                        "Skipping fetch request for bucket {bucket} because 
the bucket has been unsubscribed."
+                    );
                     continue;
                 }
             };
 
-            if let Some(leader) = self.get_table_bucket_leader(&bucket) {
-                let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
-                    partition_id: None,
-                    bucket_id: bucket.bucket_id(),
-                    fetch_offset: offset,
-                    // 1M
-                    max_fetch_bytes: 1024 * 1024,
-                };
-
-                fetch_log_req_for_buckets
-                    .entry(leader)
-                    .or_insert_with(Vec::new)
-                    .push(fetch_log_req_for_bucket);
-                ready_for_fetch_count += 1;
+            match self.get_table_bucket_leader(&bucket) {
+                None => {
+                    log::trace!(
+                        "Skipping fetch request for bucket {bucket} because 
leader is not available."
+                    )
+                }
+                Some(leader) => {
+                    if self
+                        .nodes_with_pending_fetch_requests
+                        .lock()
+                        .contains(&leader)
+                    {
+                        log::trace!(
+                            "Skipping fetch request for bucket {bucket} 
because previous request to server {leader} has not been processed."
+                        )
+                    } else {
+                        let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
+                            partition_id: None,
+                            bucket_id: bucket.bucket_id(),
+                            fetch_offset: offset,
+                            // 1M
+                            max_fetch_bytes: 1024 * 1024,
+                        };
+
+                        fetch_log_req_for_buckets
+                            .entry(leader)
+                            .or_insert_with(Vec::new)
+                            .push(fetch_log_req_for_bucket);
+                        ready_for_fetch_count += 1;
+                    }
+                }
             }
         }
 
@@ -427,8 +752,11 @@ impl LogFetcher {
     }
 
     fn fetchable_buckets(&self) -> Vec<TableBucket> {
-        // always available now
-        self.log_scanner_status.fetchable_buckets(|_| true)
+        // Get buckets that are not already in the buffer
+        let buffered = self.log_fetch_buffer.buffered_buckets();
+        let buffered_set: HashSet<TableBucket> = 
buffered.into_iter().collect();
+        self.log_scanner_status
+            .fetchable_buckets(|tb| !buffered_set.contains(tb))
     }
 
     fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 9295713..0a803ae 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -39,6 +39,7 @@ use arrow_schema::SchemaRef;
 use arrow_schema::{DataType as ArrowDataType, Field};
 use byteorder::WriteBytesExt;
 use byteorder::{ByteOrder, LittleEndian};
+use bytes::Bytes;
 use crc32c::crc32c;
 use parking_lot::Mutex;
 use std::{
@@ -347,17 +348,17 @@ pub trait ToArrow {
     fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
 }
 
-pub struct LogRecordsBatchs<'a> {
-    data: &'a [u8],
+pub struct LogRecordsBatches {
+    data: Bytes,
     current_pos: usize,
     remaining_bytes: usize,
 }
 
-impl<'a> LogRecordsBatchs<'a> {
-    pub fn new(data: &'a [u8]) -> Self {
+impl LogRecordsBatches {
+    pub fn new(data: Vec<u8>) -> Self {
         let remaining_bytes: usize = data.len();
         Self {
-            data,
+            data: Bytes::from(data),
             current_pos: 0,
             remaining_bytes,
         }
@@ -378,14 +379,17 @@ impl<'a> LogRecordsBatchs<'a> {
     }
 }
 
-impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> {
-    type Item = LogRecordBatch<'a>;
+impl Iterator for LogRecordsBatches {
+    type Item = LogRecordBatch;
 
     fn next(&mut self) -> Option<Self::Item> {
         match self.next_batch_size() {
             Some(batch_size) => {
-                let data_slice = &self.data[self.current_pos..self.current_pos 
+ batch_size];
-                let record_batch = LogRecordBatch::new(data_slice);
+                let start = self.current_pos;
+                let end = start + batch_size;
+                // Since LogRecordsBatches owns the Vec<u8>, the slice is valid
+                // as long as the mutable reference exists, which is 'a
+                let record_batch = 
LogRecordBatch::new(self.data.slice(start..end));
                 self.current_pos += batch_size;
                 self.remaining_bytes -= batch_size;
                 Some(record_batch)
@@ -395,13 +399,13 @@ impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> {
     }
 }
 
-pub struct LogRecordBatch<'a> {
-    data: &'a [u8],
+pub struct LogRecordBatch {
+    data: Bytes,
 }
 
 #[allow(dead_code)]
-impl<'a> LogRecordBatch<'a> {
-    pub fn new(data: &'a [u8]) -> Self {
+impl LogRecordBatch {
+    pub fn new(data: Bytes) -> Self {
         LogRecordBatch { data }
     }
 
@@ -710,6 +714,7 @@ pub struct ReadContext {
     target_schema: SchemaRef,
     full_schema: SchemaRef,
     projection: Option<Projection>,
+    is_from_remote: bool,
 }
 
 #[derive(Clone)]
@@ -723,24 +728,39 @@ struct Projection {
 }
 
 impl ReadContext {
-    pub fn new(arrow_schema: SchemaRef) -> ReadContext {
+    pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext {
         ReadContext {
             target_schema: arrow_schema.clone(),
             full_schema: arrow_schema,
             projection: None,
+            is_from_remote,
         }
     }
 
     pub fn with_projection_pushdown(
         arrow_schema: SchemaRef,
         projected_fields: Vec<usize>,
+        is_from_remote: bool,
     ) -> ReadContext {
         let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
-        let mut sorted_fields = projected_fields.clone();
-        sorted_fields.sort_unstable();
+        // the logic is little bit of hard to understand, to refactor it to 
follow
+        // java side
+        let (need_do_reorder, sorted_fields) = {
+            // currently, for remote read, arrow log doesn't support 
projection pushdown,
+            // so, only need to do reordering when is not from remote
+            if !is_from_remote {
+                let mut sorted_fields = projected_fields.clone();
+                sorted_fields.sort_unstable();
+                (!sorted_fields.eq(&projected_fields), sorted_fields)
+            } else {
+                // sorted_fields won't be used when need_do_reorder is false,
+                // let's use an empty vec directly
+                (false, vec![])
+            }
+        };
 
         let project = {
-            if !sorted_fields.eq(&projected_fields) {
+            if need_do_reorder {
                 // reordering is required
                 // Calculate reordering indexes to transform from sorted order 
to user-requested order
                 let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
@@ -778,6 +798,7 @@ impl ReadContext {
             target_schema,
             full_schema: arrow_schema,
             projection: Some(project),
+            is_from_remote,
         }
     }
 
@@ -805,17 +826,24 @@ impl ReadContext {
     pub fn record_batch(&self, data: &[u8]) -> Result<RecordBatch> {
         let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
 
-        // the record batch from server must be ordered by field pos,
-        // according to project to decide what arrow schema to use
-        // to parse the record batch
-        let resolve_schema = match self.projection {
-            Some(ref projection) => {
-                // projection, should use ordered schema by project field pos
-                projection.ordered_schema.clone()
-            }
-            None => {
-                // no projection, use target output schema
-                self.target_schema.clone()
+        let resolve_schema = {
+            // if from remote, no projection, need to use full schema
+            if self.is_from_remote {
+                self.full_schema.clone()
+            } else {
+                // the record batch from server must be ordered by field pos,
+                // according to project to decide what arrow schema to use
+                // to parse the record batch
+                match self.projection {
+                    Some(ref projection) => {
+                        // projection, should use ordered schema by project 
field pos
+                        projection.ordered_schema.clone()
+                    }
+                    None => {
+                        // no projection, use target output schema
+                        self.target_schema.clone()
+                    }
+                }
             }
         };
 
@@ -829,14 +857,27 @@ impl ReadContext {
         )?;
 
         let record_batch = match &self.projection {
-            Some(projection) if projection.reordering_needed => {
-                // Reorder columns if needed (when projection pushdown with 
non-sorted order)
-                let reordered_columns: Vec<_> = projection
-                    .reordering_indexes
-                    .iter()
-                    .map(|&idx| record_batch.column(idx).clone())
-                    .collect();
-                RecordBatch::try_new(self.target_schema.clone(), 
reordered_columns)?
+            Some(projection) => {
+                let reordered_columns = {
+                    // need to do reorder
+                    if self.is_from_remote {
+                        Some(&projection.projected_fields)
+                    } else if projection.reordering_needed {
+                        Some(&projection.reordering_indexes)
+                    } else {
+                        None
+                    }
+                };
+                match reordered_columns {
+                    Some(reordered_columns) => {
+                        let arrow_columns = reordered_columns
+                            .iter()
+                            .map(|&idx| record_batch.column(idx).clone())
+                            .collect();
+                        RecordBatch::try_new(self.target_schema.clone(), 
arrow_columns)?
+                    }
+                    _ => record_batch,
+                }
             }
             _ => record_batch,
         };
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index a058bfe..9eec98e 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -148,7 +148,7 @@ mod table_test {
         }
 
         let scan_records = log_scanner
-            .poll(std::time::Duration::from_secs(5))
+            .poll(std::time::Duration::from_secs(60))
             .await
             .expect("Failed to poll");
 
@@ -178,7 +178,7 @@ mod table_test {
         }
 
         let scan_records_projected = log_scanner_projected
-            .poll(std::time::Duration::from_secs(5))
+            .poll(std::time::Duration::from_secs(10))
             .await
             .expect("Failed to poll");
 
@@ -227,7 +227,7 @@ mod table_test {
 
         // Poll for records
         let scan_records = log_scanner
-            .poll(tokio::time::Duration::from_secs(5))
+            .poll(tokio::time::Duration::from_secs(10))
             .await
             .expect("Failed to poll records");
 
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index ca61ff8..bdbced9 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -175,6 +175,8 @@ mod table_remote_scan_test {
         let num_buckets = table.table_info().get_num_buckets();
         let log_scanner = table
             .new_scan()
+            .project(&[1, 0])
+            .unwrap()
             .create_log_scanner()
             .expect("Failed to create log scanner");
         for bucket_id in 0..num_buckets {
@@ -186,7 +188,7 @@ mod table_remote_scan_test {
 
         let mut records = Vec::with_capacity(record_count);
         let start = std::time::Instant::now();
-        const MAX_WAIT_DURATION: Duration = Duration::from_secs(30);
+        const MAX_WAIT_DURATION: Duration = Duration::from_secs(60);
         while records.len() < record_count {
             if start.elapsed() > MAX_WAIT_DURATION {
                 panic!(
@@ -208,8 +210,8 @@ mod table_remote_scan_test {
             let row = record.row();
             let expected_c1 = i as i32;
             let expected_c2 = format!("v{}", i);
-            assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", 
i);
-            assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index 
{}", i);
+            assert_eq!(row.get_int(1), expected_c1, "c1 mismatch at index {}", 
i);
+            assert_eq!(row.get_string(0), expected_c2, "c2 mismatch at index 
{}", i);
         }
     }
 

Reply via email to