This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch improve-log-poll
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git

commit 1e175d9b04f99a7017b1876555825572a0ed104a
Author: luoyuxia <[email protected]>
AuthorDate: Sat Dec 20 19:38:49 2025 +0800

    improve fetch logic
---
 crates/fluss/Cargo.toml                            |   1 +
 crates/fluss/src/client/credentials.rs             |  30 +-
 crates/fluss/src/client/table/log_fetch_buffer.rs  | 238 +++++--------
 crates/fluss/src/client/table/remote_log.rs        |  88 +++--
 crates/fluss/src/client/table/scanner.rs           | 395 ++++++++++++---------
 crates/fluss/src/record/arrow.rs                   | 109 ++++--
 crates/fluss/tests/integration/table.rs            |   4 +-
 .../fluss/tests/integration/table_remote_scan.rs   |   8 +-
 8 files changed, 461 insertions(+), 412 deletions(-)

diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index cdba9de..27604ee 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -58,6 +58,7 @@ url = "2.5.7"
 uuid = { version = "1.10", features = ["v4"] }
 tempfile = "3.23.0"
 snafu = "0.8.3"
+scopeguard = "1.2.0"
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/client/credentials.rs 
b/crates/fluss/src/client/credentials.rs
index 6b07d08..8adfe48 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -90,20 +90,20 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> 
Option<(String, bool)> {
 
 pub struct CredentialsCache {
     inner: RwLock<Option<CachedToken>>,
+    rpc_client: Arc<RpcClient>,
+    metadata: Arc<Metadata>,
 }
 
 impl CredentialsCache {
-    pub fn new() -> Self {
+    pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
         Self {
             inner: RwLock::new(None),
+            rpc_client,
+            metadata,
         }
     }
 
-    pub async fn get_or_refresh(
-        &self,
-        rpc_client: &Arc<RpcClient>,
-        metadata: &Arc<Metadata>,
-    ) -> Result<HashMap<String, String>> {
+    pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
         {
             let guard = self.inner.read();
             if let Some(cached) = guard.as_ref() {
@@ -113,17 +113,13 @@ impl CredentialsCache {
             }
         }
 
-        self.refresh_from_server(rpc_client, metadata).await
+        self.refresh_from_server().await
     }
 
-    async fn refresh_from_server(
-        &self,
-        rpc_client: &Arc<RpcClient>,
-        metadata: &Arc<Metadata>,
-    ) -> Result<HashMap<String, String>> {
-        let cluster = metadata.get_cluster();
+    async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
+        let cluster = self.metadata.get_cluster();
         let server_node = cluster.get_one_available_server();
-        let conn = rpc_client.get_connection(server_node).await?;
+        let conn = self.rpc_client.get_connection(server_node).await?;
 
         let request = GetSecurityTokenRequest::new();
         let response = conn.request(request).await?;
@@ -158,9 +154,3 @@ impl CredentialsCache {
         Ok(props)
     }
 }
-
-impl Default for CredentialsCache {
-    fn default() -> Self {
-        Self::new()
-    }
-}
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index eca4de9..5b70dcd 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::error::{Error, Result};
+use crate::error::Result;
 use crate::metadata::TableBucket;
-use crate::proto::PbFetchLogRespForBucket;
-use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
+use crate::record::{
+    LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, 
ScanRecord,
+};
 use parking_lot::Mutex;
 use std::collections::{HashMap, VecDeque};
 use std::sync::Arc;
@@ -30,14 +31,13 @@ use tokio::sync::Notify;
 pub trait CompletedFetch: Send + Sync {
     fn table_bucket(&self) -> &TableBucket;
     fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
-    fn next_fetch_offset(&self) -> i64;
     fn is_consumed(&self) -> bool;
     fn drain(&mut self);
     fn size_in_bytes(&self) -> usize;
     fn high_watermark(&self) -> i64;
     fn is_initialized(&self) -> bool;
     fn set_initialized(&mut self);
-    fn fetch_offset(&self) -> i64;
+    fn next_fetch_offset(&self) -> i64;
 }
 
 /// Represents a pending fetch that is waiting to be completed
@@ -66,34 +66,34 @@ impl LogFetchBuffer {
             woken_up: Arc::new(AtomicBool::new(false)),
         }
     }
-    
+
     /// Check if the buffer is empty
     pub fn is_empty(&self) -> bool {
         self.completed_fetches.lock().is_empty()
     }
-    
+
     /// Wait for the buffer to become non-empty, with timeout
     /// Returns true if data became available, false if timeout
     pub async fn await_not_empty(&self, timeout: Duration) -> bool {
         let deadline = std::time::Instant::now() + timeout;
-        
+
         loop {
             // Check if buffer is not empty
             if !self.is_empty() {
                 return true;
             }
-            
+
             // Check if woken up
             if self.woken_up.swap(false, Ordering::Acquire) {
                 return true;
             }
-            
+
             // Check if timeout
             let now = std::time::Instant::now();
             if now >= deadline {
                 return false;
             }
-            
+
             // Wait for notification with remaining time
             let remaining = deadline - now;
             let notified = self.not_empty_notify.notified();
@@ -108,7 +108,8 @@ impl LogFetchBuffer {
             }
         }
     }
-    
+
+    #[allow(dead_code)]
     /// Wake up any waiting threads
     pub fn wakeup(&self) {
         self.woken_up.store(true, Ordering::Release);
@@ -121,7 +122,7 @@ impl LogFetchBuffer {
         self.pending_fetches
             .lock()
             .entry(table_bucket)
-            .or_insert_with(VecDeque::new)
+            .or_default()
             .push_back(pending_fetch);
     }
 
@@ -132,50 +133,49 @@ impl LogFetchBuffer {
             let mut has_completed = false;
             while let Some(front) = pendings.front() {
                 if front.is_completed() {
-                    if let Some(pending) = pendings.pop_front() {
-                        match pending.to_completed_fetch() {
-                            Ok(completed) => {
-                                
self.completed_fetches.lock().push_back(completed);
-                                // Signal that buffer is not empty
-                                self.not_empty_notify.notify_waiters();
-                                has_completed = true;
-                            }
-                            Err(_) => {
-                                // Skip failed fetches
-                            }
+                    let pending = pendings.pop_front().unwrap();
+                    match pending.to_completed_fetch() {
+                        Ok(completed) => {
+                            self.completed_fetches.lock().push_back(completed);
+                            // Signal that buffer is not empty
+                            self.not_empty_notify.notify_waiters();
+                            has_completed = true;
+                        }
+                        Err(e) => {
+                            // todo: handle exception?
+                            log::error!("Error when completing: {e}");
                         }
                     }
                 } else {
                     break;
                 }
             }
-            if pendings.is_empty() {
-                pending_map.remove(table_bucket);
+
+            if has_completed {
+                self.not_empty_notify.notify_waiters();
+                if pendings.is_empty() {
+                    pending_map.remove(table_bucket);
+                }
             }
         }
     }
 
     /// Add a completed fetch to the buffer
     pub fn add(&self, completed_fetch: Box<dyn CompletedFetch>) {
-        let table_bucket = completed_fetch.table_bucket().clone();
+        let table_bucket = completed_fetch.table_bucket();
         let mut pending_map = self.pending_fetches.lock();
-        let should_notify = if let Some(pendings) = 
pending_map.get_mut(&table_bucket) {
+
+        if let Some(pendings) = pending_map.get_mut(table_bucket) {
             if pendings.is_empty() {
                 self.completed_fetches.lock().push_back(completed_fetch);
-                true
+                self.not_empty_notify.notify_waiters();
             } else {
-                // Convert to pending fetch wrapper
-                let completed_pending = 
CompletedPendingFetch::new(completed_fetch);
-                pendings.push_back(Box::new(completed_pending));
-                false
+                
pendings.push_back(Box::new(CompletedPendingFetch::new(completed_fetch)));
             }
         } else {
+            // If there's no pending fetch for this table_bucket,
+            // directly add to completed_fetches
             self.completed_fetches.lock().push_back(completed_fetch);
-            true
-        };
-        
-        // Signal that buffer is not empty if we added to completed_fetches
-        if should_notify {
             self.not_empty_notify.notify_waiters();
         }
     }
@@ -198,6 +198,14 @@ impl LogFetchBuffer {
     /// Get the set of buckets that have buffered data
     pub fn buffered_buckets(&self) -> Vec<TableBucket> {
         let mut buckets = Vec::new();
+
+        let next_in_line_fetch = self.next_in_line_fetch.lock();
+        if let Some(complete_fetch) = next_in_line_fetch.as_ref() {
+            if !complete_fetch.is_consumed() {
+                buckets.push(complete_fetch.table_bucket().clone());
+            }
+        }
+
         let completed = self.completed_fetches.lock();
         for fetch in completed.iter() {
             buckets.push(fetch.table_bucket().clone());
@@ -214,58 +222,6 @@ impl Default for LogFetchBuffer {
     }
 }
 
-/// Pending fetch that waits for fetch log response
-pub struct FetchPendingFetch {
-    table_bucket: TableBucket,
-    response: Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>,
-    read_context: ReadContext,
-    fetch_offset: i64,
-}
-
-impl FetchPendingFetch {
-    pub fn new(
-        table_bucket: TableBucket,
-        read_context: ReadContext,
-        fetch_offset: i64,
-    ) -> (Self, Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>) {
-        let response = Arc::new(Mutex::new(None));
-        let pending = Self {
-            table_bucket,
-            response: Arc::clone(&response),
-            read_context,
-            fetch_offset,
-        };
-        (pending, response)
-    }
-
-    pub fn set_response(&self, response: Result<PbFetchLogRespForBucket>) {
-        *self.response.lock() = Some(response);
-    }
-}
-
-impl PendingFetch for FetchPendingFetch {
-    fn table_bucket(&self) -> &TableBucket {
-        &self.table_bucket
-    }
-
-    fn is_completed(&self) -> bool {
-        self.response.lock().is_some()
-    }
-
-    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
-        let response = self.response.lock().take().ok_or_else(|| {
-            Error::Io(std::io::Error::other("Fetch response not available"))
-        })??;
-        let completed = DefaultCompletedFetch::new(
-            self.table_bucket,
-            &response,
-            self.read_context,
-            self.fetch_offset,
-        )?;
-        Ok(Box::new(completed))
-    }
-}
-
 /// A wrapper that makes a completed fetch look like a pending fetch
 struct CompletedPendingFetch {
     completed_fetch: Box<dyn CompletedFetch>,
@@ -294,64 +250,65 @@ impl PendingFetch for CompletedPendingFetch {
 /// Default implementation of CompletedFetch for in-memory log records
 pub struct DefaultCompletedFetch {
     table_bucket: TableBucket,
-    data: Vec<u8>,
+    log_record_batch: LogRecordsBatches,
     read_context: ReadContext,
-    fetch_offset: i64, // The offset at which this fetch started
     next_fetch_offset: i64,
     high_watermark: i64,
     size_in_bytes: usize,
     consumed: bool,
     initialized: bool,
-    // Pre-parsed records for efficient access
-    records: Vec<ScanRecord>,
-    current_index: usize,
+    records_read: usize,
+    current_record_iterator: Option<LogRecordIterator>,
+    current_record_batch: Option<LogRecordBatch>,
 }
 
 impl DefaultCompletedFetch {
     pub fn new(
         table_bucket: TableBucket,
-        fetch_response: &PbFetchLogRespForBucket,
+        log_record_batch: LogRecordsBatches,
+        size_in_bytes: usize,
         read_context: ReadContext,
         fetch_offset: i64,
+        high_watermark: i64,
     ) -> Result<Self> {
-        let data = fetch_response.records.clone().unwrap_or_default();
-        let size_in_bytes = data.len();
-        let high_watermark = fetch_response.high_watermark.unwrap_or(-1);
-
-        // Parse all records upfront
-        let mut records = Vec::new();
-        for log_record in &mut LogRecordsBatchs::new(&data) {
-            let last_offset = log_record.last_log_offset();
-            let batch_records = log_record.records(&read_context)?;
-            for record in batch_records {
-                records.push(record);
-            }
-            // Update next_fetch_offset based on the last batch
-            let next_offset = last_offset + 1;
-            // We'll update this when we actually consume records
-        }
-
-        // Set next_fetch_offset based on the last record if available
-        let next_fetch_offset = if let Some(last_record) = records.last() {
-            last_record.offset() + 1
-        } else {
-            fetch_offset
-        };
-
         Ok(Self {
             table_bucket,
-            data,
+            log_record_batch,
             read_context,
-            fetch_offset,
-            next_fetch_offset,
+            next_fetch_offset: fetch_offset,
             high_watermark,
             size_in_bytes,
             consumed: false,
             initialized: false,
-            records,
-            current_index: 0,
+            records_read: 0,
+            current_record_iterator: None,
+            current_record_batch: None,
         })
     }
+
+    /// Get the next fetched record, handling batch iteration and record 
skipping
+    fn next_fetched_record(&mut self) -> Result<Option<ScanRecord>> {
+        loop {
+            if let Some(record) = self
+                .current_record_iterator
+                .as_mut()
+                .and_then(Iterator::next)
+            {
+                if record.offset() >= self.next_fetch_offset {
+                    return Ok(Some(record));
+                }
+            } else if let Some(batch) = self.log_record_batch.next() {
+                self.current_record_iterator = 
Some(batch.records(&self.read_context)?);
+                self.current_record_batch = Some(batch);
+            } else {
+                if let Some(batch) = self.current_record_batch.take() {
+                    self.next_fetch_offset = batch.next_log_offset();
+                }
+                self.drain();
+                return Ok(None);
+            }
+        }
+    }
 }
 
 impl CompletedFetch for DefaultCompletedFetch {
@@ -360,30 +317,24 @@ impl CompletedFetch for DefaultCompletedFetch {
     }
 
     fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
+        // todo: handle corrupt_last_record
         if self.consumed {
             return Ok(Vec::new());
         }
 
-        let end_index = std::cmp::min(self.current_index + max_records, 
self.records.len());
-        let records = self.records[self.current_index..end_index].to_vec();
-        self.current_index = end_index;
+        let mut scan_records = Vec::new();
 
-        if self.current_index >= self.records.len() {
-            self.consumed = true;
-            // Update next_fetch_offset based on the last record
-            if let Some(last_record) = self.records.last() {
-                self.next_fetch_offset = last_record.offset() + 1;
+        for _ in 0..max_records {
+            if let Some(record) = self.next_fetched_record()? {
+                self.next_fetch_offset = record.offset() + 1;
+                self.records_read += 1;
+                scan_records.push(record);
+            } else {
+                break;
             }
-        } else if let Some(last_record) = records.last() {
-            // Update next_fetch_offset as we consume records
-            self.next_fetch_offset = last_record.offset() + 1;
         }
 
-        Ok(records)
-    }
-
-    fn next_fetch_offset(&self) -> i64 {
-        self.next_fetch_offset
+        Ok(scan_records)
     }
 
     fn is_consumed(&self) -> bool {
@@ -392,7 +343,6 @@ impl CompletedFetch for DefaultCompletedFetch {
 
     fn drain(&mut self) {
         self.consumed = true;
-        self.current_index = self.records.len();
     }
 
     fn size_in_bytes(&self) -> usize {
@@ -411,7 +361,7 @@ impl CompletedFetch for DefaultCompletedFetch {
         self.initialized = true;
     }
 
-    fn fetch_offset(&self) -> i64 {
-        self.fetch_offset
+    fn next_fetch_offset(&self) -> i64 {
+        self.next_fetch_offset
     }
 }
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index 4c835bb..9ac61c3 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -18,11 +18,11 @@ use 
crate::client::table::log_fetch_buffer::{CompletedFetch, DefaultCompletedFet
 use crate::error::{Error, Result};
 use crate::io::{FileIO, Storage};
 use crate::metadata::TableBucket;
-use crate::proto::PbFetchLogRespForBucket;
 use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
-use crate::record::ReadContext;
+use crate::record::{LogRecordsBatches, ReadContext};
 use crate::util::delete_file;
-use parking_lot::Mutex;
+use parking_lot::{Mutex, RwLock};
+use std::collections::HashMap;
 use std::io;
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
@@ -72,19 +72,19 @@ pub struct RemoteLogFetchInfo {
 }
 
 impl RemoteLogFetchInfo {
-    pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) 
-> Result<Self> {
+    pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) 
-> Self {
         let segments = info
             .remote_log_segments
             .iter()
             .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
             .collect();
 
-        Ok(Self {
+        Self {
             remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
             partition_name: info.partition_name.clone(),
             remote_log_segments: segments,
             first_start_pos: info.first_start_pos.unwrap_or(0),
-        })
+        }
     }
 }
 
@@ -94,13 +94,15 @@ type CompletionCallback = Box<dyn Fn() + Send + Sync>;
 pub struct RemoteLogDownloadFuture {
     result: Arc<Mutex<Option<Result<PathBuf>>>>,
     completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
+    // todo: add recycleCallback
 }
 
 impl RemoteLogDownloadFuture {
     pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self {
         let result = Arc::new(Mutex::new(None));
         let result_clone = Arc::clone(&result);
-        let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> = 
Arc::new(Mutex::new(Vec::new()));
+        let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> =
+            Arc::new(Mutex::new(Vec::new()));
         let callbacks_clone = Arc::clone(&completion_callbacks);
 
         // Spawn a task to wait for the download and update result, then call 
callbacks
@@ -108,7 +110,10 @@ impl RemoteLogDownloadFuture {
             let download_result = match receiver.await {
                 Ok(Ok(path)) => Ok(path),
                 Ok(Err(e)) => Err(e),
-                Err(e) => Err(Error::Io(io::Error::other(format!("Download 
future cancelled: {e:?}")))),
+                Err(e) => Err(Error::UnexpectedError {
+                    message: format!("Download future cancelled: {e:?}"),
+                    source: None,
+                }),
             };
             *result_clone.lock() = Some(download_result);
 
@@ -152,17 +157,25 @@ impl RemoteLogDownloadFuture {
         }
     }
 
-    /// Get the downloaded file path
-    pub async fn get_file_path(&mut self) -> Result<PathBuf> {
-        let receiver = self.receiver.take().ok_or_else(|| 
Error::UnexpectedError {
-            message: "Downloaded file already consumed".to_string(),
-            source: None,
-        })?;
+    pub fn is_done(&self) -> bool {
+        self.result.lock().is_some()
+    }
 
-        receiver.await.map_err(|e| Error::UnexpectedError {
-            message: format!("Download future cancelled: {e:?}"),
-            source: None,
-        })?
+    /// Get the downloaded file path (synchronous, only works after is_done() 
returns true)
+    pub fn get_file_path(&self) -> Result<PathBuf> {
+        // todo: handle download fail
+        let guard = self.result.lock();
+        match guard.as_ref() {
+            Some(Ok(path)) => Ok(path.clone()),
+            Some(Err(e)) => Err(Error::IoUnexpectedError {
+                message: format!("Download failed: {e}"),
+                source: io::Error::other(format!("{e:?}")),
+            }),
+            None => Err(Error::IoUnexpectedError {
+                message: "Download not completed yet".to_string(),
+                source: io::Error::other("Download not completed yet"),
+            }),
+        }
     }
 }
 
@@ -198,8 +211,13 @@ impl RemoteLogDownloader {
         let remote_fs_props = self.remote_fs_props.read().clone();
         // Spawn async download task
         tokio::spawn(async move {
-            let result =
-                Self::download_file(&remote_log_tablet_dir, &remote_path, 
&local_file_path).await;
+            let result = Self::download_file(
+                &remote_log_tablet_dir,
+                &remote_path,
+                &local_file_path,
+                &remote_fs_props,
+            )
+            .await;
             let _ = sender.send(result);
         });
         RemoteLogDownloadFuture::new(receiver)
@@ -305,7 +323,6 @@ impl RemoteLogDownloader {
     }
 }
 
-
 /// Pending fetch that waits for remote log file to be downloaded
 pub struct RemotePendingFetch {
     segment: RemoteLogSegment,
@@ -348,39 +365,32 @@ impl PendingFetch for RemotePendingFetch {
     fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
         // Get the file path (this should only be called when is_completed() 
returns true)
         let file_path = self.download_future.get_file_path()?;
-
         // Read the file data synchronously (we're in a sync context)
         // Note: This is a limitation - we need to use blocking I/O here
-        let file_data = std::fs::read(&file_path).map_err(|e| {
-            Error::Io(io::Error::other(format!("Failed to read downloaded 
file: {e:?}")))
+        let mut file_data = std::fs::read(&file_path).map_err(|e| 
Error::IoUnexpectedError {
+            message: format!("Failed to read downloaded file: {file_path:?}."),
+            source: e,
         })?;
 
         // Slice the data if needed
         let data = if self.pos_in_log_segment > 0 {
-            &file_data[self.pos_in_log_segment as usize..]
+            file_data.split_off(self.pos_in_log_segment as usize)
         } else {
-            &file_data
+            file_data
         };
 
-        // Create a mock PbFetchLogRespForBucket for DefaultCompletedFetch
-        // We'll use the data we read from the file
-        let fetch_response = PbFetchLogRespForBucket {
-            bucket_id: self.segment.table_bucket.bucket_id(),
-            partition_id: None,
-            error_code: None,
-            error_message: None,
-            high_watermark: Some(self.high_watermark),
-            log_start_offset: None,
-            remote_log_fetch_info: None,
-            records: Some(data.to_vec()),
-        };
+        let size_in_bytes = data.len();
+
+        let log_record_batch = LogRecordsBatches::new(data);
 
         // Create DefaultCompletedFetch from the data
         let completed_fetch = DefaultCompletedFetch::new(
             self.segment.table_bucket,
-            &fetch_response,
+            log_record_batch,
+            size_in_bytes,
             self.read_context,
             self.fetch_offset,
+            self.high_watermark,
         )?;
 
         // Delete the downloaded local file to free disk (async, but we'll do 
it in background)
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 5053865..26d37d7 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -18,26 +18,26 @@
 use crate::client::connection::FlussConnection;
 use crate::client::credentials::CredentialsCache;
 use crate::client::metadata::Metadata;
+use crate::client::table::log_fetch_buffer::{
+    CompletedFetch, DefaultCompletedFetch, LogFetchBuffer,
+};
+use crate::client::table::remote_log::{
+    RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
+};
 use crate::error::{Error, Result};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
-use crate::record::{to_arrow_schema, ReadContext, ScanRecord, ScanRecords};
-use crate::rpc::{message, RpcClient};
+use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
+use crate::rpc::{RpcClient, message};
 use crate::util::FairBucketStatusMap;
 use arrow_schema::SchemaRef;
+use log::{debug, error, warn};
 use parking_lot::{Mutex, RwLock};
 use std::collections::{HashMap, HashSet};
 use std::slice::from_ref;
 use std::sync::Arc;
 use std::time::Duration;
 use tempfile::TempDir;
-use log::warn;
-use crate::client::table::log_fetch_buffer::{
-    CompletedFetch, DefaultCompletedFetch, LogFetchBuffer,
-};
-use crate::client::table::remote_log::{
-    RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
-};
 
 const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
 #[allow(dead_code)]
@@ -198,7 +198,11 @@ impl LogScanner {
 
             // Wait for buffer to become non-empty with remaining time
             let remaining = deadline - now;
-            let has_data = 
self.log_fetcher.log_fetch_buffer.await_not_empty(remaining).await;
+            let has_data = self
+                .log_fetcher
+                .log_fetch_buffer
+                .await_not_empty(remaining)
+                .await;
 
             if !has_data {
                 // Timeout while waiting
@@ -220,7 +224,12 @@ impl LogScanner {
     }
 
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
-        // Send fetch requests (non-blocking)
+        let result = self.log_fetcher.collect_fetches()?;
+        if !result.is_empty() {
+            return Ok(result);
+        }
+
+        // send any new fetches (won't resend pending fetches).
         self.log_fetcher.send_fetches().await?;
 
         // Collect completed fetches from buffer
@@ -229,14 +238,15 @@ impl LogScanner {
 }
 
 struct LogFetcher {
-    table_path: TablePath,
     conns: Arc<RpcClient>,
-    table_info: TableInfo,
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
     read_context: ReadContext,
+    remote_read_context: ReadContext,
     remote_log_downloader: Arc<RemoteLogDownloader>,
-    credentials_cache: CredentialsCache,
+    // todo: consider schedule a background thread to update
+    // token instead of update in fetch phase
+    credentials_cache: Arc<CredentialsCache>,
     log_fetch_buffer: Arc<LogFetchBuffer>,
     nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
 }
@@ -250,19 +260,21 @@ impl LogFetcher {
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
         let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
-        let read_context = Self::create_read_context(full_arrow_schema, 
projected_fields.clone());
+        let read_context =
+            Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false);
+        let remote_read_context =
+            Self::create_read_context(full_arrow_schema, 
projected_fields.clone(), true);
 
         let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
 
         Ok(LogFetcher {
-            table_path: table_info.table_path.clone(),
-            conns,
-            table_info,
-            metadata,
+            conns: conns.clone(),
+            metadata: metadata.clone(),
             log_scanner_status,
             read_context,
+            remote_read_context,
             remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
-            credentials_cache: CredentialsCache::new(),
+            credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
             log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
             nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
         })
@@ -271,22 +283,26 @@ impl LogFetcher {
     fn create_read_context(
         full_arrow_schema: SchemaRef,
         projected_fields: Option<Vec<usize>>,
+        is_from_remote: bool,
     ) -> ReadContext {
         match projected_fields {
-            None => ReadContext::new(full_arrow_schema),
-            Some(fields) => 
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+            None => ReadContext::new(full_arrow_schema, is_from_remote),
+            Some(fields) => {
+                ReadContext::with_projection_pushdown(full_arrow_schema, 
fields, is_from_remote)
+            }
         }
     }
 
     /// Send fetch requests asynchronously without waiting for responses
     async fn send_fetches(&self) -> Result<()> {
+        // todo: check update metadata like fluss-java in case leader changes
         let fetch_request = self.prepare_fetch_log_requests().await;
 
         for (leader, fetch_request) in fetch_request {
+            debug!("Adding pending request for node id {leader}");
             // Check if we already have a pending request for this node
             {
-                self.nodes_with_pending_fetch_requests.lock()
-                    .insert(leader);
+                self.nodes_with_pending_fetch_requests.lock().insert(leader);
             }
 
             let cluster = self.metadata.get_cluster().clone();
@@ -295,45 +311,57 @@ impl LogFetcher {
             let log_fetch_buffer = self.log_fetch_buffer.clone();
             let log_scanner_status = self.log_scanner_status.clone();
             let read_context = self.read_context.clone();
+            let remote_read_context = self.remote_read_context.clone();
             let remote_log_downloader = 
Arc::clone(&self.remote_log_downloader);
+            let creds_cache = self.credentials_cache.clone();
             let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
 
             // Spawn async task to handle the fetch request
             tokio::spawn(async move {
+                // make sure it will always remote leader from pending nodes
+                let _guard = scopeguard::guard((), |_| {
+                    nodes_with_pending.lock().remove(&leader);
+                });
+
                 let server_node = cluster
                     .get_tablet_server(leader)
-                    .expect("todo: handle leader not exist.").clone();
-                let result = conns.get_connection(&server_node).await;
-                match result {
-                    Ok(con) => {
-                        let fetch_result = con
-                            
.request(message::FetchLogRequest::new(fetch_request))
-                            .await;
-
-                        match fetch_result {
-                            Ok(fetch_response) => {
-                                Self::handle_fetch_response(
-                                    fetch_response,
-                                    &log_fetch_buffer,
-                                    &log_scanner_status,
-                                    &read_context,
-                                    &remote_log_downloader,
-                                ).await;
-                            }
-                            Err(e) => {
-                                warn!("Failed to fetch log from destination 
node {:?}: {:?}",
-                                    server_node,
-                                    e);
-                            }
-                        }
+                    .expect("todo: handle leader not exist.");
+
+                let con = match conns.get_connection(server_node).await {
+                    Ok(con) => con,
+                    Err(e) => {
+                        // todo: handle failed to get connection
+                        warn!("Failed to get connection to destination node: 
{e:?}");
+                        return;
                     }
+                };
+
+                let fetch_response = match con
+                    .request(message::FetchLogRequest::new(fetch_request))
+                    .await
+                {
+                    Ok(resp) => resp,
                     Err(e) => {
-                        warn!("Failed to get connection to destination node: 
{:?}", e);
+                        // todo: handle fetch log from destination node
+                        warn!("Failed to fetch log from destination node 
{server_node:?}: {e:?}");
+                        return;
                     }
-                }
+                };
 
-                // Remove from pending set
-                nodes_with_pending.lock().remove(&leader);
+                if let Err(e) = Self::handle_fetch_response(
+                    fetch_response,
+                    &log_fetch_buffer,
+                    &log_scanner_status,
+                    &read_context,
+                    &remote_read_context,
+                    &remote_log_downloader,
+                    &creds_cache,
+                )
+                .await
+                {
+                    // todo: handle fail to handle fetch response
+                    error!("Fail to handle fetch response: {e:?}");
+                }
             });
         }
 
@@ -346,8 +374,10 @@ impl LogFetcher {
         log_fetch_buffer: &Arc<LogFetchBuffer>,
         log_scanner_status: &Arc<LogScannerStatus>,
         read_context: &ReadContext,
+        remote_read_context: &ReadContext,
         remote_log_downloader: &Arc<RemoteLogDownloader>,
-    ) {
+        credentials_cache: &Arc<CredentialsCache>,
+    ) -> Result<()> {
         for pb_fetch_log_resp in fetch_response.tables_resp {
             let table_id = pb_fetch_log_resp.table_id;
             let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
@@ -356,90 +386,107 @@ impl LogFetcher {
                 let bucket: i32 = fetch_log_for_bucket.bucket_id;
                 let table_bucket = TableBucket::new(table_id, bucket);
 
+                // todo: check fetch result code for per-bucket
+                let Some(fetch_offset) = 
log_scanner_status.get_bucket_offset(&table_bucket) else {
+                    debug!(
+                        "Ignoring fetch log response for bucket {table_bucket} 
because the bucket has been unsubscribed."
+                    );
+                    continue;
+                };
+
                 // Check if this is a remote log fetch
-                if let Some(ref remote_log_fetch_info) =
-                    fetch_log_for_bucket.remote_log_fetch_info
+                if let Some(ref remote_log_fetch_info) = 
fetch_log_for_bucket.remote_log_fetch_info
                 {
+                    // set remote fs props
+                    let remote_fs_props = 
credentials_cache.get_or_refresh().await?;
+                    remote_log_downloader.set_remote_fs_props(remote_fs_props);
+
+                    let remote_fetch_info =
+                        RemoteLogFetchInfo::from_proto(remote_log_fetch_info, 
table_bucket.clone());
+
+                    let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+                    Self::pending_remote_fetches(
+                        remote_log_downloader.clone(),
+                        log_fetch_buffer.clone(),
+                        remote_read_context.clone(),
+                        &table_bucket,
+                        remote_fetch_info,
+                        fetch_offset,
+                        high_watermark,
+                    );
+                } else if fetch_log_for_bucket.records.is_some() {
+                    // Handle regular in-memory records - create completed 
fetch directly
+                    let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+                    let records = 
fetch_log_for_bucket.records.unwrap_or(vec![]);
+                    let size_in_bytes = records.len();
+                    let log_record_batch = LogRecordsBatches::new(records);
 
-                    let remote_fetch_info = match 
RemoteLogFetchInfo::from_proto(
-                        remote_log_fetch_info,
+                    match DefaultCompletedFetch::new(
                         table_bucket.clone(),
+                        log_record_batch,
+                        size_in_bytes,
+                        read_context.clone(),
+                        fetch_offset,
+                        high_watermark,
                     ) {
-                        Ok(info) => info,
-                        Err(e) => {
-                            eprintln!("Failed to parse remote log fetch info: 
{:?}", e);
-                            continue;
-                        }
-                    };
-
-                        if let Some(fetch_offset) =
-                            log_scanner_status.get_bucket_offset(&table_bucket)
-                        {
-                            let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
-                            // Download and process remote log segments
-                            let mut pos_in_log_segment = 
remote_fetch_info.first_start_pos;
-                            let mut current_fetch_offset = fetch_offset;
-                            // todo: make segment download parallelly
-                            for (i, segment) in
-                                
remote_fetch_info.remote_log_segments.iter().enumerate()
-                            {
-                                if i > 0 {
-                                    pos_in_log_segment = 0;
-                                    current_fetch_offset = 
segment.start_offset;
-                                }
-
-                            let download_future = 
remote_log_downloader.request_remote_log(
-                                &remote_fetch_info.remote_log_tablet_dir,
-                                segment,
-                            );
-
-                            let table_bucket_clone = table_bucket.clone();
-
-                            // Register callback to be called when download 
completes
-                            // (similar to Java's downloadFuture.onComplete)
-                            // This must be done before creating 
RemotePendingFetch to avoid move issues
-                            let log_fetch_buffer_clone = 
Arc::clone(log_fetch_buffer);
-                            download_future.on_complete(move || {
-                                
log_fetch_buffer_clone.try_complete(&table_bucket_clone);
-                            });
-
-                            let pending_fetch = RemotePendingFetch::new(
-                                segment.clone(),
-                                download_future,
-                                pos_in_log_segment,
-                                current_fetch_offset,
-                                high_watermark,
-                                read_context.clone(),
-                            );
-                            // Add to pending fetches in buffer (similar to 
Java's logFetchBuffer.pend)
-                            log_fetch_buffer.pend(Box::new(pending_fetch));
+                        Ok(completed_fetch) => {
+                            log_fetch_buffer.add(Box::new(completed_fetch));
                         }
-                    } else {
-                        // if the offset is null, it means the bucket has been 
unsubscribed,
-                        // skip processing and continue to the next bucket.
-                        continue;
-                    }
-                } else if fetch_log_for_bucket.records.is_some() {
-                    // Handle regular in-memory records - create completed 
fetch directly
-                    if let Some(fetch_offset) = 
log_scanner_status.get_bucket_offset(&table_bucket) {
-                        match DefaultCompletedFetch::new(
-                            table_bucket.clone(),
-                            &fetch_log_for_bucket,
-                            read_context.clone(),
-                            fetch_offset,
-                        ) {
-                            Ok(completed_fetch) => {
-                                
log_fetch_buffer.add(Box::new(completed_fetch));
-                            }
-                            Err(e) => {
-                                // todo: handle error
-                                eprintln!("Failed to create completed fetch: 
{:?}", e);
-                            }
+                        Err(e) => {
+                            // todo: handle error
+                            log::warn!("Failed to create completed fetch: 
{e:?}");
                         }
                     }
                 }
             }
         }
+        Ok(())
+    }
+
+    fn pending_remote_fetches(
+        remote_log_downloader: Arc<RemoteLogDownloader>,
+        log_fetch_buffer: Arc<LogFetchBuffer>,
+        read_context: ReadContext,
+        table_bucket: &TableBucket,
+        remote_fetch_info: RemoteLogFetchInfo,
+        fetch_offset: i64,
+        high_watermark: i64,
+    ) {
+        // Download and process remote log segments
+        let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
+        let mut current_fetch_offset = fetch_offset;
+        for (i, segment) in 
remote_fetch_info.remote_log_segments.iter().enumerate() {
+            if i > 0 {
+                pos_in_log_segment = 0;
+                current_fetch_offset = segment.start_offset;
+            }
+
+            // todo:
+            // 1: control the max threads to download remote segment
+            // 2: introduce priority queue to priority highest for earliest 
segment
+            let download_future = remote_log_downloader
+                .request_remote_log(&remote_fetch_info.remote_log_tablet_dir, 
segment);
+
+            // Register callback to be called when download completes
+            // (similar to Java's downloadFuture.onComplete)
+            // This must be done before creating RemotePendingFetch to avoid 
move issues
+            let table_bucket = table_bucket.clone();
+            let log_fetch_buffer_clone = log_fetch_buffer.clone();
+            download_future.on_complete(move || {
+                log_fetch_buffer_clone.try_complete(&table_bucket);
+            });
+
+            let pending_fetch = RemotePendingFetch::new(
+                segment.clone(),
+                download_future,
+                pos_in_log_segment,
+                current_fetch_offset,
+                high_watermark,
+                read_context.clone(),
+            );
+            // Add to pending fetches in buffer (similar to Java's 
logFetchBuffer.pend)
+            log_fetch_buffer.pend(Box::new(pending_fetch));
+        }
     }
 
     /// Collect completed fetches from buffer
@@ -470,14 +517,15 @@ impl LogFetcher {
                                 // (2) there are no fetched records with 
actual content preceding this
                                 // exception.
                                 if result.is_empty() && size_in_bytes == 0 {
-                                    // todo: consider it?
+                                    // todo: do we need to consider it like 
java ?
                                     // self.log_fetch_buffer.poll();
                                 }
                                 return Err(e);
                             }
                         }
                     } else {
-                        
self.log_fetch_buffer.set_next_in_line_fetch(Some(completed_fetch));
+                        self.log_fetch_buffer
+                            .set_next_in_line_fetch(Some(completed_fetch));
                     }
                     // Note: peek() already removed the fetch from buffer, so 
no need to call poll()
                 } else {
@@ -487,7 +535,8 @@ impl LogFetcher {
             } else {
                 // Fetch records from next_in_line
                 if let Some(mut next_fetch) = next_in_line {
-                    let records = self.fetch_records_from_fetch(&mut 
next_fetch, records_remaining)?;
+                    let records =
+                        self.fetch_records_from_fetch(&mut next_fetch, 
records_remaining)?;
 
                     if !records.is_empty() {
                         let table_bucket = next_fetch.table_bucket().clone();
@@ -510,29 +559,22 @@ impl LogFetcher {
         &self,
         mut completed_fetch: Box<dyn CompletedFetch>,
     ) -> Result<Option<Box<dyn CompletedFetch>>> {
-
-        // todo: handle initialize failure
-
-        let table_bucket = completed_fetch.table_bucket().clone();
-        let fetch_offset = completed_fetch.fetch_offset();
+        // todo: handle error in initialize fetch
+        let table_bucket = completed_fetch.table_bucket();
+        let fetch_offset = completed_fetch.next_fetch_offset();
 
         // Check if bucket is still subscribed
-        let current_offset = 
self.log_scanner_status.get_bucket_offset(&table_bucket);
-        if current_offset.is_none() {
+        let Some(current_offset) = 
self.log_scanner_status.get_bucket_offset(table_bucket) else {
             warn!(
-                "Discarding stale fetch response for bucket {:?} since the 
bucket has been unsubscribed",
-                table_bucket
+                "Discarding stale fetch response for bucket {table_bucket:?} 
since the bucket has been unsubscribed"
             );
             return Ok(None);
-        }
-
-        let current_offset = current_offset.unwrap();
+        };
 
         // Check if offset matches
         if fetch_offset != current_offset {
             warn!(
-                "Discarding stale fetch response for bucket {:?} since its 
offset {} does not match the expected offset {}",
-                table_bucket, fetch_offset, current_offset
+                "Discarding stale fetch response for bucket {table_bucket:?} 
since its offset {fetch_offset} does not match the expected offset 
{current_offset}"
             );
             return Ok(None);
         }
@@ -540,7 +582,8 @@ impl LogFetcher {
         // Update high watermark
         let high_watermark = completed_fetch.high_watermark();
         if high_watermark >= 0 {
-            self.log_scanner_status.update_high_watermark(&table_bucket, 
high_watermark);
+            self.log_scanner_status
+                .update_high_watermark(table_bucket, high_watermark);
         }
 
         completed_fetch.set_initialized();
@@ -558,15 +601,14 @@ impl LogFetcher {
 
         if current_offset.is_none() {
             warn!(
-                "Ignoring fetched records for {:?} since the bucket has been 
unsubscribed",
-                table_bucket
+                "Ignoring fetched records for {table_bucket:?} since the 
bucket has been unsubscribed"
             );
             next_in_line_fetch.drain();
             return Ok(Vec::new());
         }
 
         let current_offset = current_offset.unwrap();
-        let fetch_offset = next_in_line_fetch.fetch_offset();
+        let fetch_offset = next_in_line_fetch.next_fetch_offset();
 
         // Check if this fetch is next in line
         if fetch_offset == current_offset {
@@ -574,15 +616,15 @@ impl LogFetcher {
             let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
 
             if next_fetch_offset > current_offset {
-                self.log_scanner_status.update_offset(&table_bucket, 
next_fetch_offset);
+                self.log_scanner_status
+                    .update_offset(&table_bucket, next_fetch_offset);
             }
 
             Ok(records)
         } else {
             // These records aren't next in line, ignore them
             warn!(
-                "Ignoring fetched records for {:?} at offset {} since the 
current offset is {}",
-                table_bucket, fetch_offset, current_offset
+                "Ignoring fetched records for {table_bucket:?} at offset 
{fetch_offset} since the current offset is {current_offset}"
             );
             next_in_line_fetch.drain();
             Ok(Vec::new())
@@ -601,27 +643,43 @@ impl LogFetcher {
             let offset = match 
self.log_scanner_status.get_bucket_offset(&bucket) {
                 Some(offset) => offset,
                 None => {
-                    // todo: debug
+                    debug!(
+                        "Skipping fetch request for bucket {bucket} because 
the bucket has been unsubscribed."
+                    );
                     continue;
                 }
             };
 
-            if let Some(leader) = self.get_table_bucket_leader(&bucket) {
-                if !self.nodes_with_pending_fetch_requests.lock()
-                    .contains(&leader) {
-                    let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
-                        partition_id: None,
-                        bucket_id: bucket.bucket_id(),
-                        fetch_offset: offset,
-                        // 1M
-                        max_fetch_bytes: 1024 * 1024,
-                    };
-
-                    fetch_log_req_for_buckets
-                        .entry(leader)
-                        .or_insert_with(Vec::new)
-                        .push(fetch_log_req_for_bucket);
-                    ready_for_fetch_count += 1;
+            match self.get_table_bucket_leader(&bucket) {
+                None => {
+                    log::trace!(
+                        "Skipping fetch request for bucket {bucket} because 
leader is not available."
+                    )
+                }
+                Some(leader) => {
+                    if self
+                        .nodes_with_pending_fetch_requests
+                        .lock()
+                        .contains(&leader)
+                    {
+                        log::trace!(
+                            "Skipping fetch request for bucket {bucket} 
because previous request to server {leader} has not been processed."
+                        )
+                    } else {
+                        let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
+                            partition_id: None,
+                            bucket_id: bucket.bucket_id(),
+                            fetch_offset: offset,
+                            // 1M
+                            max_fetch_bytes: 1024 * 1024,
+                        };
+
+                        fetch_log_req_for_buckets
+                            .entry(leader)
+                            .or_insert_with(Vec::new)
+                            .push(fetch_log_req_for_bucket);
+                        ready_for_fetch_count += 1;
+                    }
                 }
             }
         }
@@ -662,7 +720,8 @@ impl LogFetcher {
         // Get buckets that are not already in the buffer
         let buffered = self.log_fetch_buffer.buffered_buckets();
         let buffered_set: HashSet<TableBucket> = 
buffered.into_iter().collect();
-        self.log_scanner_status.fetchable_buckets(|tb| 
!buffered_set.contains(tb))
+        self.log_scanner_status
+            .fetchable_buckets(|tb| !buffered_set.contains(tb))
     }
 
     fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 9295713..7fa878e 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -39,6 +39,7 @@ use arrow_schema::SchemaRef;
 use arrow_schema::{DataType as ArrowDataType, Field};
 use byteorder::WriteBytesExt;
 use byteorder::{ByteOrder, LittleEndian};
+use bytes::Bytes;
 use crc32c::crc32c;
 use parking_lot::Mutex;
 use std::{
@@ -347,17 +348,17 @@ pub trait ToArrow {
     fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
 }
 
-pub struct LogRecordsBatchs<'a> {
-    data: &'a [u8],
+pub struct LogRecordsBatches {
+    data: Bytes,
     current_pos: usize,
     remaining_bytes: usize,
 }
 
-impl<'a> LogRecordsBatchs<'a> {
-    pub fn new(data: &'a [u8]) -> Self {
+impl LogRecordsBatches {
+    pub fn new(data: Vec<u8>) -> Self {
         let remaining_bytes: usize = data.len();
         Self {
-            data,
+            data: Bytes::from(data),
             current_pos: 0,
             remaining_bytes,
         }
@@ -378,14 +379,17 @@ impl<'a> LogRecordsBatchs<'a> {
     }
 }
 
-impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> {
-    type Item = LogRecordBatch<'a>;
+impl Iterator for LogRecordsBatches {
+    type Item = LogRecordBatch;
 
     fn next(&mut self) -> Option<Self::Item> {
         match self.next_batch_size() {
             Some(batch_size) => {
-                let data_slice = &self.data[self.current_pos..self.current_pos 
+ batch_size];
-                let record_batch = LogRecordBatch::new(data_slice);
+                let start = self.current_pos;
+                let end = start + batch_size;
+                // Since LogRecordsBatchs owns the Vec<u8>, the slice is valid
+                // as long as the mutable reference exists, which is 'a
+                let record_batch = 
LogRecordBatch::new(self.data.slice(start..end));
                 self.current_pos += batch_size;
                 self.remaining_bytes -= batch_size;
                 Some(record_batch)
@@ -395,13 +399,13 @@ impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> {
     }
 }
 
-pub struct LogRecordBatch<'a> {
-    data: &'a [u8],
+pub struct LogRecordBatch {
+    data: Bytes,
 }
 
 #[allow(dead_code)]
-impl<'a> LogRecordBatch<'a> {
-    pub fn new(data: &'a [u8]) -> Self {
+impl LogRecordBatch {
+    pub fn new(data: Bytes) -> Self {
         LogRecordBatch { data }
     }
 
@@ -710,6 +714,7 @@ pub struct ReadContext {
     target_schema: SchemaRef,
     full_schema: SchemaRef,
     projection: Option<Projection>,
+    is_from_remote: bool,
 }
 
 #[derive(Clone)]
@@ -723,24 +728,35 @@ struct Projection {
 }
 
 impl ReadContext {
-    pub fn new(arrow_schema: SchemaRef) -> ReadContext {
+    pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext {
         ReadContext {
             target_schema: arrow_schema.clone(),
             full_schema: arrow_schema,
             projection: None,
+            is_from_remote,
         }
     }
 
     pub fn with_projection_pushdown(
         arrow_schema: SchemaRef,
         projected_fields: Vec<usize>,
+        is_from_remote: bool,
     ) -> ReadContext {
         let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
-        let mut sorted_fields = projected_fields.clone();
-        sorted_fields.sort_unstable();
+        let (need_do_reorder, sorted_fields) = {
+            // currently, for remote read, arrow log doesn't support 
projection pushdown,
+            // so, only need to do reordering when is not from remote
+            if !is_from_remote {
+                let mut sorted_fields = projected_fields.clone();
+                sorted_fields.sort_unstable();
+                (!sorted_fields.eq(&projected_fields), sorted_fields)
+            } else {
+                (false, vec![])
+            }
+        };
 
         let project = {
-            if !sorted_fields.eq(&projected_fields) {
+            if need_do_reorder {
                 // reordering is required
                 // Calculate reordering indexes to transform from sorted order 
to user-requested order
                 let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
@@ -778,6 +794,7 @@ impl ReadContext {
             target_schema,
             full_schema: arrow_schema,
             projection: Some(project),
+            is_from_remote,
         }
     }
 
@@ -805,17 +822,24 @@ impl ReadContext {
     pub fn record_batch(&self, data: &[u8]) -> Result<RecordBatch> {
         let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
 
-        // the record batch from server must be ordered by field pos,
-        // according to project to decide what arrow schema to use
-        // to parse the record batch
-        let resolve_schema = match self.projection {
-            Some(ref projection) => {
-                // projection, should use ordered schema by project field pos
-                projection.ordered_schema.clone()
-            }
-            None => {
-                // no projection, use target output schema
-                self.target_schema.clone()
+        let resolve_schema = {
+            // if from remote, no projection, need to use full schema
+            if self.is_from_remote {
+                self.full_schema.clone()
+            } else {
+                // the record batch from server must be ordered by field pos,
+                // according to project to decide what arrow schema to use
+                // to parse the record batch
+                match self.projection {
+                    Some(ref projection) => {
+                        // projection, should use ordered schema by project 
field pos
+                        projection.ordered_schema.clone()
+                    }
+                    None => {
+                        // no projection, use target output schema
+                        self.target_schema.clone()
+                    }
+                }
             }
         };
 
@@ -829,14 +853,27 @@ impl ReadContext {
         )?;
 
         let record_batch = match &self.projection {
-            Some(projection) if projection.reordering_needed => {
-                // Reorder columns if needed (when projection pushdown with 
non-sorted order)
-                let reordered_columns: Vec<_> = projection
-                    .reordering_indexes
-                    .iter()
-                    .map(|&idx| record_batch.column(idx).clone())
-                    .collect();
-                RecordBatch::try_new(self.target_schema.clone(), 
reordered_columns)?
+            Some(projection) => {
+                let reordered_columns = {
+                    // need to do reorder
+                    if self.is_from_remote {
+                        Some(&projection.projected_fields)
+                    } else if projection.reordering_needed {
+                        Some(&projection.reordering_indexes)
+                    } else {
+                        None
+                    }
+                };
+                match reordered_columns {
+                    Some(reordered_columns) => {
+                        let arrow_columns = reordered_columns
+                            .iter()
+                            .map(|&idx| record_batch.column(idx).clone())
+                            .collect();
+                        RecordBatch::try_new(self.target_schema.clone(), 
arrow_columns)?
+                    }
+                    _ => record_batch,
+                }
             }
             _ => record_batch,
         };
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index a54f469..9eec98e 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -178,7 +178,7 @@ mod table_test {
         }
 
         let scan_records_projected = log_scanner_projected
-            .poll(std::time::Duration::from_secs(5))
+            .poll(std::time::Duration::from_secs(10))
             .await
             .expect("Failed to poll");
 
@@ -227,7 +227,7 @@ mod table_test {
 
         // Poll for records
         let scan_records = log_scanner
-            .poll(tokio::time::Duration::from_secs(5))
+            .poll(tokio::time::Duration::from_secs(10))
             .await
             .expect("Failed to poll records");
 
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index ca61ff8..bdbced9 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -175,6 +175,8 @@ mod table_remote_scan_test {
         let num_buckets = table.table_info().get_num_buckets();
         let log_scanner = table
             .new_scan()
+            .project(&[1, 0])
+            .unwrap()
             .create_log_scanner()
             .expect("Failed to create log scanner");
         for bucket_id in 0..num_buckets {
@@ -186,7 +188,7 @@ mod table_remote_scan_test {
 
         let mut records = Vec::with_capacity(record_count);
         let start = std::time::Instant::now();
-        const MAX_WAIT_DURATION: Duration = Duration::from_secs(30);
+        const MAX_WAIT_DURATION: Duration = Duration::from_secs(60);
         while records.len() < record_count {
             if start.elapsed() > MAX_WAIT_DURATION {
                 panic!(
@@ -208,8 +210,8 @@ mod table_remote_scan_test {
             let row = record.row();
             let expected_c1 = i as i32;
             let expected_c2 = format!("v{}", i);
-            assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", 
i);
-            assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index 
{}", i);
+            assert_eq!(row.get_int(1), expected_c1, "c1 mismatch at index {}", 
i);
+            assert_eq!(row.get_string(0), expected_c2, "c2 mismatch at index 
{}", i);
         }
     }
 

Reply via email to