This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 18af060  chore: Improve read path error handling logic (#143)
18af060 is described below

commit 18af060a70f12567ad56ff5dbacf3ca52f2abb75
Author: AlexZhao <[email protected]>
AuthorDate: Sat Jan 17 14:33:57 2026 +0800

    chore: Improve read path error handling logic (#143)
---
 crates/fluss/src/client/table/log_fetch_buffer.rs | 378 ++++++++++++-
 crates/fluss/src/client/table/remote_log.rs       |   2 +-
 crates/fluss/src/client/table/scanner.rs          | 628 +++++++++++++++++-----
 crates/fluss/src/client/write/sender.rs           |   2 +-
 crates/fluss/src/error.rs                         |   5 +
 crates/fluss/src/record/arrow.rs                  |  67 ++-
 crates/fluss/src/record/mod.rs                    |  62 +++
 crates/fluss/src/row/column.rs                    |  64 +++
 crates/fluss/src/rpc/message/list_offsets.rs      |  52 +-
 crates/fluss/src/util/mod.rs                      |  54 ++
 10 files changed, 1122 insertions(+), 192 deletions(-)

diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index c55c994..fb6981f 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -18,7 +18,7 @@
 use arrow::array::RecordBatch;
 use parking_lot::Mutex;
 
-use crate::error::Result;
+use crate::error::{ApiError, Error, Result};
 use crate::metadata::TableBucket;
 use crate::record::{
     LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, 
ScanRecord,
@@ -29,12 +29,38 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use tokio::sync::Notify;
 
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub(crate) enum FetchErrorAction {
+    Ignore,
+    LogOffsetOutOfRange,
+    Authorization,
+    CorruptMessage,
+    Unexpected,
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub(crate) enum FetchErrorLogLevel {
+    Debug,
+    Warn,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct FetchErrorContext {
+    pub(crate) action: FetchErrorAction,
+    pub(crate) log_level: FetchErrorLogLevel,
+    pub(crate) log_message: String,
+}
+
 /// Represents a completed fetch that can be consumed
 pub trait CompletedFetch: Send + Sync {
     fn table_bucket(&self) -> &TableBucket;
+    fn api_error(&self) -> Option<&ApiError>;
+    fn fetch_error_context(&self) -> Option<&FetchErrorContext>;
+    fn take_error(&mut self) -> Option<Error>;
     fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
     fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>>;
     fn is_consumed(&self) -> bool;
+    fn records_read(&self) -> usize;
     fn drain(&mut self);
     fn size_in_bytes(&self) -> usize;
     fn high_watermark(&self) -> i64;
@@ -52,6 +78,7 @@ pub trait PendingFetch: Send + Sync {
 
 /// Thread-safe buffer for completed fetches
 pub struct LogFetchBuffer {
+    read_context: ReadContext,
     completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
     pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn 
PendingFetch>>>>,
     next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
@@ -60,8 +87,9 @@ pub struct LogFetchBuffer {
 }
 
 impl LogFetchBuffer {
-    pub fn new() -> Self {
+    pub fn new(read_context: ReadContext) -> Self {
         Self {
+            read_context,
             completed_fetches: Mutex::new(VecDeque::new()),
             pending_fetches: Mutex::new(HashMap::new()),
             next_in_line_fetch: Mutex::new(None),
@@ -75,26 +103,28 @@ impl LogFetchBuffer {
         self.completed_fetches.lock().is_empty()
     }
 
-    /// Wait for the buffer to become non-empty, with timeout
-    /// Returns true if data became available, false if timeout
-    pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+    /// Wait for the buffer to become non-empty, with timeout.
+    /// Returns true if data became available, false if timeout.
+    pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
         let deadline = std::time::Instant::now() + timeout;
 
         loop {
             // Check if buffer is not empty
             if !self.is_empty() {
-                return true;
+                return Ok(true);
             }
 
             // Check if woken up
             if self.woken_up.swap(false, Ordering::Acquire) {
-                return true;
+                return Err(Error::WakeupError {
+                    message: "The await operation was interrupted by 
wakeup.".to_string(),
+                });
             }
 
             // Check if timeout
             let now = std::time::Instant::now();
             if now >= deadline {
-                return false;
+                return Ok(false);
             }
 
             // Wait for notification with remaining time
@@ -102,7 +132,7 @@ impl LogFetchBuffer {
             let notified = self.not_empty_notify.notified();
             tokio::select! {
                 _ = tokio::time::sleep(remaining) => {
-                    return false; // Timeout
+                    return Ok(false); // Timeout
                 }
                 _ = notified => {
                     // Got notification, check again
@@ -119,6 +149,26 @@ impl LogFetchBuffer {
         self.not_empty_notify.notify_waiters();
     }
 
+    pub(crate) fn add_api_error(
+        &self,
+        table_bucket: TableBucket,
+        api_error: ApiError,
+        fetch_error_context: FetchErrorContext,
+        fetch_offset: i64,
+    ) {
+        let error_fetch = DefaultCompletedFetch::from_api_error(
+            table_bucket,
+            api_error,
+            fetch_error_context,
+            fetch_offset,
+            self.read_context.clone(),
+        );
+        self.completed_fetches
+            .lock()
+            .push_back(Box::new(error_fetch));
+        self.not_empty_notify.notify_waiters();
+    }
+
     /// Add a pending fetch to the buffer
     pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) {
         let table_bucket = pending_fetch.table_bucket().clone();
@@ -136,6 +186,7 @@ impl LogFetchBuffer {
         // holding both locks simultaneously.
         let mut completed_to_push: Vec<Box<dyn CompletedFetch>> = Vec::new();
         let mut has_completed = false;
+        let mut pending_error: Option<Error> = None;
         {
             let mut pending_map = self.pending_fetches.lock();
             if let Some(pendings) = pending_map.get_mut(table_bucket) {
@@ -148,8 +199,9 @@ impl LogFetchBuffer {
                                 has_completed = true;
                             }
                             Err(e) => {
-                                // todo: handle exception?
-                                log::error!("Error when completing: {e}");
+                                pending_error = Some(e);
+                                has_completed = true;
+                                break;
                             }
                         }
                     } else {
@@ -162,11 +214,22 @@ impl LogFetchBuffer {
             }
         }
 
+        if let Some(error) = pending_error {
+            let error_fetch = DefaultCompletedFetch::from_error(
+                table_bucket.clone(),
+                error,
+                -1,
+                self.read_context.clone(),
+            );
+            completed_to_push.push(Box::new(error_fetch));
+        }
+
         if !completed_to_push.is_empty() {
             let mut completed_queue = self.completed_fetches.lock();
             for completed in completed_to_push {
                 completed_queue.push_back(completed);
             }
+            has_completed = true;
         }
 
         if has_completed {
@@ -236,12 +299,6 @@ impl LogFetchBuffer {
     }
 }
 
-impl Default for LogFetchBuffer {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
 /// A wrapper that makes a completed fetch look like a pending fetch
 struct CompletedPendingFetch {
     completed_fetch: Box<dyn CompletedFetch>,
@@ -270,6 +327,9 @@ impl PendingFetch for CompletedPendingFetch {
 /// Default implementation of CompletedFetch for in-memory log records
 pub struct DefaultCompletedFetch {
     table_bucket: TableBucket,
+    api_error: Option<ApiError>,
+    fetch_error_context: Option<FetchErrorContext>,
+    error: Option<Error>,
     log_record_batch: LogRecordsBatches,
     read_context: ReadContext,
     next_fetch_offset: i64,
@@ -280,6 +340,9 @@ pub struct DefaultCompletedFetch {
     records_read: usize,
     current_record_iterator: Option<LogRecordIterator>,
     current_record_batch: Option<LogRecordBatch>,
+    last_record: Option<ScanRecord>,
+    cached_record_error: Option<String>,
+    corrupt_last_record: bool,
 }
 
 impl DefaultCompletedFetch {
@@ -290,9 +353,12 @@ impl DefaultCompletedFetch {
         read_context: ReadContext,
         fetch_offset: i64,
         high_watermark: i64,
-    ) -> Result<Self> {
-        Ok(Self {
+    ) -> Self {
+        Self {
             table_bucket,
+            api_error: None,
+            fetch_error_context: None,
+            error: None,
             log_record_batch,
             read_context,
             next_fetch_offset: fetch_offset,
@@ -303,7 +369,65 @@ impl DefaultCompletedFetch {
             records_read: 0,
             current_record_iterator: None,
             current_record_batch: None,
-        })
+            last_record: None,
+            cached_record_error: None,
+            corrupt_last_record: false,
+        }
+    }
+
+    pub(crate) fn from_error(
+        table_bucket: TableBucket,
+        error: Error,
+        fetch_offset: i64,
+        read_context: ReadContext,
+    ) -> Self {
+        Self {
+            table_bucket,
+            api_error: None,
+            fetch_error_context: None,
+            error: Some(error),
+            log_record_batch: LogRecordsBatches::new(Vec::new()),
+            read_context,
+            next_fetch_offset: fetch_offset,
+            high_watermark: -1,
+            size_in_bytes: 0,
+            consumed: false,
+            initialized: false,
+            records_read: 0,
+            current_record_iterator: None,
+            current_record_batch: None,
+            last_record: None,
+            cached_record_error: None,
+            corrupt_last_record: false,
+        }
+    }
+
+    pub(crate) fn from_api_error(
+        table_bucket: TableBucket,
+        api_error: ApiError,
+        fetch_error_context: FetchErrorContext,
+        fetch_offset: i64,
+        read_context: ReadContext,
+    ) -> Self {
+        Self {
+            table_bucket,
+            api_error: Some(api_error),
+            fetch_error_context: Some(fetch_error_context),
+            error: None,
+            log_record_batch: LogRecordsBatches::new(Vec::new()),
+            read_context,
+            next_fetch_offset: fetch_offset,
+            high_watermark: -1,
+            size_in_bytes: 0,
+            consumed: false,
+            initialized: false,
+            records_read: 0,
+            current_record_iterator: None,
+            current_record_batch: None,
+            last_record: None,
+            cached_record_error: None,
+            corrupt_last_record: false,
+        }
     }
 
     /// Get the next fetched record, handling batch iteration and record 
skipping
@@ -330,6 +454,19 @@ impl DefaultCompletedFetch {
         }
     }
 
+    fn fetch_error(&self) -> Error {
+        let mut message = format!(
+            "Received exception when fetching the next record from 
{table_bucket}. If needed, please back to past the record to continue 
scanning.",
+            table_bucket = self.table_bucket
+        );
+        if let Some(cause) = self.cached_record_error.as_deref() {
+            message.push_str(&format!(" Cause: {cause}"));
+        }
+        Error::UnexpectedError {
+            message,
+            source: None,
+        }
+    }
     /// Get the next batch directly without row iteration
     fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
         loop {
@@ -368,8 +505,36 @@ impl CompletedFetch for DefaultCompletedFetch {
         &self.table_bucket
     }
 
+    fn api_error(&self) -> Option<&ApiError> {
+        self.api_error.as_ref()
+    }
+
+    fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+        self.fetch_error_context.as_ref()
+    }
+
+    fn take_error(&mut self) -> Option<Error> {
+        self.error.take()
+    }
+
     fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
-        // todo: handle corrupt_last_record
+        if let Some(error) = self.error.take() {
+            return Err(error);
+        }
+
+        if let Some(api_error) = self.api_error.as_ref() {
+            return Err(Error::FlussAPIError {
+                api_error: ApiError {
+                    code: api_error.code,
+                    message: api_error.message.clone(),
+                },
+            });
+        }
+
+        if self.corrupt_last_record {
+            return Err(self.fetch_error());
+        }
+
         if self.consumed {
             return Ok(Vec::new());
         }
@@ -377,19 +542,53 @@ impl CompletedFetch for DefaultCompletedFetch {
         let mut scan_records = Vec::new();
 
         for _ in 0..max_records {
-            if let Some(record) = self.next_fetched_record()? {
-                self.next_fetch_offset = record.offset() + 1;
-                self.records_read += 1;
-                scan_records.push(record);
-            } else {
-                break;
+            if self.cached_record_error.is_none() {
+                self.corrupt_last_record = true;
+                match self.next_fetched_record() {
+                    Ok(Some(record)) => {
+                        self.corrupt_last_record = false;
+                        self.last_record = Some(record);
+                    }
+                    Ok(None) => {
+                        self.corrupt_last_record = false;
+                        self.last_record = None;
+                    }
+                    Err(e) => {
+                        self.cached_record_error = Some(e.to_string());
+                    }
+                }
             }
+
+            let Some(record) = self.last_record.take() else {
+                break;
+            };
+
+            self.next_fetch_offset = record.offset() + 1;
+            self.records_read += 1;
+            scan_records.push(record);
+        }
+
+        if self.cached_record_error.is_some() && scan_records.is_empty() {
+            return Err(self.fetch_error());
         }
 
         Ok(scan_records)
     }
 
     fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+        if let Some(error) = self.error.take() {
+            return Err(error);
+        }
+
+        if let Some(api_error) = self.api_error.as_ref() {
+            return Err(Error::FlussAPIError {
+                api_error: ApiError {
+                    code: api_error.code,
+                    message: api_error.message.clone(),
+                },
+            });
+        }
+
         if self.consumed {
             return Ok(Vec::new());
         }
@@ -410,8 +609,18 @@ impl CompletedFetch for DefaultCompletedFetch {
         self.consumed
     }
 
+    fn records_read(&self) -> usize {
+        self.records_read
+    }
+
     fn drain(&mut self) {
         self.consumed = true;
+        self.api_error = None;
+        self.fetch_error_context = None;
+        self.error = None;
+        self.cached_record_error = None;
+        self.corrupt_last_record = false;
+        self.last_record = None;
     }
 
     fn size_in_bytes(&self) -> usize {
@@ -434,3 +643,118 @@ impl CompletedFetch for DefaultCompletedFetch {
         self.next_fetch_offset
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::client::WriteRecord;
+    use crate::compression::{
+        ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+    };
+    use crate::metadata::{DataField, DataTypes, TablePath};
+    use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, 
to_arrow_schema};
+    use crate::row::GenericRow;
+    use std::sync::Arc;
+    use std::time::Duration;
+
+    fn test_read_context() -> ReadContext {
+        let row_type = DataTypes::row(vec![DataField::new(
+            "id".to_string(),
+            DataTypes::int(),
+            None,
+        )]);
+        ReadContext::new(to_arrow_schema(&row_type), false)
+    }
+
+    struct ErrorPendingFetch {
+        table_bucket: TableBucket,
+    }
+
+    impl PendingFetch for ErrorPendingFetch {
+        fn table_bucket(&self) -> &TableBucket {
+            &self.table_bucket
+        }
+
+        fn is_completed(&self) -> bool {
+            true
+        }
+
+        fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn 
CompletedFetch>> {
+            Err(Error::UnexpectedError {
+                message: "pending fetch failure".to_string(),
+                source: None,
+            })
+        }
+    }
+
+    #[tokio::test]
+    async fn await_not_empty_returns_wakeup_error() {
+        let buffer = LogFetchBuffer::new(test_read_context());
+        buffer.wakeup();
+
+        let result = buffer.await_not_empty(Duration::from_millis(10)).await;
+        assert!(matches!(result, Err(Error::WakeupError { .. })));
+    }
+
+    #[tokio::test]
+    async fn await_not_empty_returns_pending_error() {
+        let buffer = LogFetchBuffer::new(test_read_context());
+        let table_bucket = TableBucket::new(1, 0);
+        buffer.pend(Box::new(ErrorPendingFetch {
+            table_bucket: table_bucket.clone(),
+        }));
+        buffer.try_complete(&table_bucket);
+
+        let result = buffer.await_not_empty(Duration::from_millis(10)).await;
+        assert!(matches!(result, Ok(true)));
+
+        let mut completed = buffer.poll().expect("completed fetch");
+        assert!(completed.take_error().is_some());
+    }
+
+    #[test]
+    fn default_completed_fetch_reads_records() -> Result<()> {
+        let row_type = DataTypes::row(vec![
+            DataField::new("id".to_string(), DataTypes::int(), None),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+        ]);
+        let table_path = Arc::new(TablePath::new("db".to_string(), 
"tbl".to_string()));
+
+        let mut builder = MemoryLogRecordsArrowBuilder::new(
+            1,
+            &row_type,
+            false,
+            ArrowCompressionInfo {
+                compression_type: ArrowCompressionType::None,
+                compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+            },
+        );
+
+        let mut row = GenericRow::new();
+        row.set_field(0, 1_i32);
+        row.set_field(1, "alice");
+        let record = WriteRecord::new(table_path, row);
+        builder.append(&record)?;
+
+        let data = builder.build()?;
+        let log_records = LogRecordsBatches::new(data.clone());
+        let read_context = ReadContext::new(to_arrow_schema(&row_type), false);
+        let mut fetch = DefaultCompletedFetch::new(
+            TableBucket::new(1, 0),
+            log_records,
+            data.len(),
+            read_context,
+            0,
+            0,
+        );
+
+        let records = fetch.fetch_records(10)?;
+        assert_eq!(records.len(), 1);
+        assert_eq!(records[0].offset(), 0);
+
+        let empty = fetch.fetch_records(10)?;
+        assert!(empty.is_empty());
+
+        Ok(())
+    }
+}
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index d9abd19..0142515 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -409,7 +409,7 @@ impl PendingFetch for RemotePendingFetch {
             self.read_context,
             self.fetch_offset,
             self.high_watermark,
-        )?;
+        );
 
         Ok(Box::new(completed_fetch))
     }
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 7d22324..3e7d61f 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -17,7 +17,7 @@
 
 use arrow::array::RecordBatch;
 use arrow_schema::SchemaRef;
-use log::{debug, error, warn};
+use log::{debug, warn};
 use parking_lot::{Mutex, RwLock};
 use std::collections::{HashMap, HashSet};
 use std::slice::from_ref;
@@ -29,16 +29,17 @@ use crate::client::connection::FlussConnection;
 use crate::client::credentials::CredentialsCache;
 use crate::client::metadata::Metadata;
 use crate::client::table::log_fetch_buffer::{
-    CompletedFetch, DefaultCompletedFetch, LogFetchBuffer,
+    CompletedFetch, DefaultCompletedFetch, FetchErrorAction, 
FetchErrorContext, FetchErrorLogLevel,
+    LogFetchBuffer,
 };
 use crate::client::table::remote_log::{
     RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
 };
-use crate::error::{Error, Result, RpcError};
+use crate::error::{ApiError, Error, FlussError, Result};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
-use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
+use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
 use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
-use crate::rpc::{RpcClient, message};
+use crate::rpc::{RpcClient, RpcError, message};
 use crate::util::FairBucketStatusMap;
 
 const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
@@ -318,7 +319,7 @@ impl LogScannerInner {
                 .log_fetcher
                 .log_fetch_buffer
                 .await_not_empty(remaining)
-                .await;
+                .await?;
 
             if !has_data {
                 // Timeout while waiting
@@ -396,7 +397,7 @@ impl LogScannerInner {
                 .log_fetcher
                 .log_fetch_buffer
                 .await_not_empty(remaining)
-                .await;
+                .await?;
 
             if !has_data {
                 return Ok(Vec::new());
@@ -448,6 +449,8 @@ impl RecordBatchLogScanner {
 struct LogFetcher {
     conns: Arc<RpcClient>,
     metadata: Arc<Metadata>,
+    table_path: TablePath,
+    is_partitioned: bool,
     log_scanner_status: Arc<LogScannerStatus>,
     read_context: ReadContext,
     remote_read_context: ReadContext,
@@ -457,8 +460,6 @@ struct LogFetcher {
     credentials_cache: Arc<CredentialsCache>,
     log_fetch_buffer: Arc<LogFetchBuffer>,
     nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
-    table_path: TablePath,
-    is_partitioned: bool,
 }
 
 impl LogFetcher {
@@ -471,24 +472,25 @@ impl LogFetcher {
     ) -> Result<Self> {
         let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
         let read_context =
-            Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false);
+            Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false)?;
         let remote_read_context =
-            Self::create_read_context(full_arrow_schema, 
projected_fields.clone(), true);
+            Self::create_read_context(full_arrow_schema, 
projected_fields.clone(), true)?;
 
         let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
+        let log_fetch_buffer = 
Arc::new(LogFetchBuffer::new(read_context.clone()));
 
         Ok(LogFetcher {
             conns: conns.clone(),
             metadata: metadata.clone(),
+            table_path: table_info.table_path.clone(),
+            is_partitioned: table_info.is_partitioned(),
             log_scanner_status,
             read_context,
             remote_read_context,
             remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
             credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
-            log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+            log_fetch_buffer,
             nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
-            table_path: table_info.table_path.clone(),
-            is_partitioned: table_info.is_partitioned(),
         })
     }
 
@@ -496,23 +498,79 @@ impl LogFetcher {
         full_arrow_schema: SchemaRef,
         projected_fields: Option<Vec<usize>>,
         is_from_remote: bool,
-    ) -> ReadContext {
+    ) -> Result<ReadContext> {
         match projected_fields {
-            None => ReadContext::new(full_arrow_schema, is_from_remote),
+            None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)),
             Some(fields) => {
                 ReadContext::with_projection_pushdown(full_arrow_schema, 
fields, is_from_remote)
             }
         }
     }
 
-    async fn check_and_update_metadata(&self) -> Result<()> {
-        if self.is_partitioned {
-            // TODO: Implement partition-aware metadata refresh for buckets 
whose leaders are unknown.
-            // The implementation will likely need to collect partition IDs 
for such buckets and
-            // perform targeted metadata updates. Until then, we avoid 
computing unused partition_ids.
-            return Ok(());
+    fn describe_fetch_error(
+        error: FlussError,
+        table_bucket: &TableBucket,
+        fetch_offset: i64,
+        error_message: &str,
+    ) -> FetchErrorContext {
+        match error {
+            FlussError::NotLeaderOrFollower
+            | FlussError::LogStorageException
+            | FlussError::KvStorageException
+            | FlussError::StorageException
+            | FlussError::FencedLeaderEpochException => FetchErrorContext {
+                action: FetchErrorAction::Ignore,
+                log_level: FetchErrorLogLevel::Debug,
+                log_message: format!(
+                    "Error in fetch for bucket {table_bucket}: {error:?}: 
{error_message}"
+                ),
+            },
+            FlussError::UnknownTableOrBucketException => FetchErrorContext {
+                action: FetchErrorAction::Ignore,
+                log_level: FetchErrorLogLevel::Warn,
+                log_message: format!(
+                    "Received unknown table or bucket error in fetch for 
bucket {table_bucket}"
+                ),
+            },
+            FlussError::LogOffsetOutOfRangeException => FetchErrorContext {
+                action: FetchErrorAction::LogOffsetOutOfRange,
+                log_level: FetchErrorLogLevel::Debug,
+                log_message: format!(
+                    "The fetching offset {fetch_offset} is out of range for 
bucket {table_bucket}: {error_message}"
+                ),
+            },
+            FlussError::AuthorizationException => FetchErrorContext {
+                action: FetchErrorAction::Authorization,
+                log_level: FetchErrorLogLevel::Debug,
+                log_message: format!(
+                    "Authorization error while fetching offset {fetch_offset} 
for bucket {table_bucket}: {error_message}"
+                ),
+            },
+            FlussError::UnknownServerError => FetchErrorContext {
+                action: FetchErrorAction::Ignore,
+                log_level: FetchErrorLogLevel::Warn,
+                log_message: format!(
+                    "Unknown server error while fetching offset {fetch_offset} 
for bucket {table_bucket}: {error_message}"
+                ),
+            },
+            FlussError::CorruptMessage => FetchErrorContext {
+                action: FetchErrorAction::CorruptMessage,
+                log_level: FetchErrorLogLevel::Debug,
+                log_message: format!(
+                    "Encountered corrupt message when fetching offset 
{fetch_offset} for bucket {table_bucket}: {error_message}"
+                ),
+            },
+            _ => FetchErrorContext {
+                action: FetchErrorAction::Unexpected,
+                log_level: FetchErrorLogLevel::Debug,
+                log_message: format!(
+                    "Unexpected error code {error:?} while fetching at offset 
{fetch_offset} from bucket {table_bucket}: {error_message}"
+                ),
+            },
         }
+    }
 
+    async fn check_and_update_metadata(&self) -> Result<()> {
         let need_update = self
             .fetchable_buckets()
             .iter()
@@ -522,6 +580,26 @@ impl LogFetcher {
             return Ok(());
         }
 
+        if self.is_partitioned {
+            // Fallback to full table metadata refresh until partition-aware 
updates are available.
+            self.metadata
+                .update_tables_metadata(&HashSet::from([&self.table_path]))
+                .await
+                .or_else(|e| {
+                    if let Error::RpcError { source, .. } = &e
+                        && matches!(source, RpcError::ConnectionError(_) | 
RpcError::Poisoned(_))
+                    {
+                        warn!(
+                            "Retrying after encountering error while updating 
table metadata: {e}"
+                        );
+                        Ok(())
+                    } else {
+                        Err(e)
+                    }
+                })?;
+            return Ok(());
+        }
+
         // TODO: Handle PartitionNotExist error
         self.metadata
             .update_tables_metadata(&HashSet::from([&self.table_path]))
@@ -561,7 +639,6 @@ impl LogFetcher {
             let creds_cache = self.credentials_cache.clone();
             let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
             let metadata = self.metadata.clone();
-
             // Spawn async task to handle the fetch request
             // Note: These tasks are not explicitly tracked or cancelled when 
LogFetcher is dropped.
             // This is acceptable because:
@@ -607,7 +684,7 @@ impl LogFetcher {
                     }
                 };
 
-                if let Err(e) = Self::handle_fetch_response(
+                Self::handle_fetch_response(
                     fetch_response,
                     &log_fetch_buffer,
                     &log_scanner_status,
@@ -616,10 +693,7 @@ impl LogFetcher {
                     &remote_log_downloader,
                     &creds_cache,
                 )
-                .await
-                {
-                    error!("Fail to handle fetch response: {e:?}");
-                }
+                .await;
             });
         }
 
@@ -644,7 +718,7 @@ impl LogFetcher {
         remote_read_context: &ReadContext,
         remote_log_downloader: &Arc<RemoteLogDownloader>,
         credentials_cache: &Arc<CredentialsCache>,
-    ) -> Result<()> {
+    ) {
         for pb_fetch_log_resp in fetch_response.tables_resp {
             let table_id = pb_fetch_log_resp.table_id;
             let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
@@ -661,11 +735,45 @@ impl LogFetcher {
                     continue;
                 };
 
+                if let Some(error_code) = fetch_log_for_bucket.error_code
+                    && error_code != FlussError::None.code()
+                {
+                    let api_error: ApiError = ErrorResponse {
+                        error_code,
+                        error_message: 
fetch_log_for_bucket.error_message.clone(),
+                    }
+                    .into();
+
+                    let error = FlussError::for_code(error_code);
+                    let error_context = Self::describe_fetch_error(
+                        error,
+                        &table_bucket,
+                        fetch_offset,
+                        api_error.message.as_str(),
+                    );
+                    
log_scanner_status.move_bucket_to_end(table_bucket.clone());
+                    match error_context.log_level {
+                        FetchErrorLogLevel::Debug => {
+                            debug!("{}", error_context.log_message);
+                        }
+                        FetchErrorLogLevel::Warn => {
+                            warn!("{}", error_context.log_message);
+                        }
+                    }
+                    log_fetch_buffer.add_api_error(
+                        table_bucket.clone(),
+                        api_error,
+                        error_context,
+                        fetch_offset,
+                    );
+                    continue;
+                }
+
                 // Check if this is a remote log fetch
                 if let Some(ref remote_log_fetch_info) = 
fetch_log_for_bucket.remote_log_fetch_info
                 {
                     // set remote fs props
-                    let remote_fs_props = 
credentials_cache.get_or_refresh().await?;
+                    let remote_fs_props = 
credentials_cache.get_or_refresh().await.unwrap();
                     remote_log_downloader.set_remote_fs_props(remote_fs_props);
 
                     let remote_fetch_info =
@@ -688,26 +796,18 @@ impl LogFetcher {
                     let size_in_bytes = records.len();
                     let log_record_batch = LogRecordsBatches::new(records);
 
-                    match DefaultCompletedFetch::new(
+                    let completed_fetch = DefaultCompletedFetch::new(
                         table_bucket.clone(),
                         log_record_batch,
                         size_in_bytes,
                         read_context.clone(),
                         fetch_offset,
                         high_watermark,
-                    ) {
-                        Ok(completed_fetch) => {
-                            log_fetch_buffer.add(Box::new(completed_fetch));
-                        }
-                        Err(e) => {
-                            // todo: handle error
-                            log::warn!("Failed to create completed fetch: 
{e:?}");
-                        }
-                    }
+                    );
+                    log_fetch_buffer.add(Box::new(completed_fetch));
                 }
             }
         }
-        Ok(())
     }
 
     fn pending_remote_fetches(
@@ -763,69 +863,91 @@ impl LogFetcher {
         let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
         let mut records_remaining = MAX_POLL_RECORDS;
 
-        while records_remaining > 0 {
-            // Get the next in line fetch, or get a new one from buffer
-            let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
-
-            if next_in_line.is_none() || 
next_in_line.as_ref().unwrap().is_consumed() {
-                // Get a new fetch from buffer
-                if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
-                    // Initialize the fetch if not already initialized
-                    if !completed_fetch.is_initialized() {
-                        let size_in_bytes = completed_fetch.size_in_bytes();
-                        match self.initialize_fetch(completed_fetch) {
-                            Ok(initialized) => {
-                                
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
-                                continue;
+        let collect_result: Result<()> = {
+            while records_remaining > 0 {
+                // Get the next in line fetch, or get a new one from buffer
+                let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+                if next_in_line.is_none() || 
next_in_line.as_ref().unwrap().is_consumed() {
+                    // Get a new fetch from buffer
+                    if let Some(completed_fetch) = 
self.log_fetch_buffer.poll() {
+                        // Initialize the fetch if not already initialized
+                        if !completed_fetch.is_initialized() {
+                            let size_in_bytes = 
completed_fetch.size_in_bytes();
+                            match self.initialize_fetch(completed_fetch) {
+                                Ok(initialized) => {
+                                    
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+                                    continue;
+                                }
+                                Err(e) => {
+                                    // Remove a completedFetch upon a parse 
with exception if
+                                    // (1) it contains no records, and
+                                    // (2) there are no fetched records with 
actual content preceding this
+                                    // exception.
+                                    if result.is_empty() && size_in_bytes == 0 
{
+                                        // todo: do we need to consider it 
like java ?
+                                        // self.log_fetch_buffer.poll();
+                                    }
+                                    return Err(e);
+                                }
                             }
+                        } else {
+                            self.log_fetch_buffer
+                                .set_next_in_line_fetch(Some(completed_fetch));
+                        }
+                        // Note: poll() already removed the fetch from buffer, 
so no need to call poll()
+                    } else {
+                        // No more fetches available
+                        break;
+                    }
+                } else {
+                    // Fetch records from next_in_line
+                    if let Some(mut next_fetch) = next_in_line {
+                        let records = match self
+                            .fetch_records_from_fetch(&mut next_fetch, 
records_remaining)
+                        {
+                            Ok(records) => records,
                             Err(e) => {
-                                // Remove a completedFetch upon a parse with 
exception if
-                                // (1) it contains no records, and
-                                // (2) there are no fetched records with 
actual content preceding this
-                                // exception.
-                                if result.is_empty() && size_in_bytes == 0 {
-                                    // todo: do we need to consider it like 
java ?
-                                    // self.log_fetch_buffer.poll();
+                                if !next_fetch.is_consumed() {
+                                    self.log_fetch_buffer
+                                        
.set_next_in_line_fetch(Some(next_fetch));
                                 }
                                 return Err(e);
                             }
+                        };
+
+                        if !records.is_empty() {
+                            let table_bucket = 
next_fetch.table_bucket().clone();
+                            // Merge with existing records for this bucket
+                            let existing = 
result.entry(table_bucket).or_default();
+                            let records_count = records.len();
+                            existing.extend(records);
+
+                            records_remaining = 
records_remaining.saturating_sub(records_count);
                         }
-                    } else {
-                        self.log_fetch_buffer
-                            .set_next_in_line_fetch(Some(completed_fetch));
+
+                        // If the fetch is not fully consumed, put it back for 
the next round
+                        if !next_fetch.is_consumed() {
+                            self.log_fetch_buffer
+                                .set_next_in_line_fetch(Some(next_fetch));
+                        }
+                        // If consumed, next_fetch will be dropped here (which 
is correct)
                     }
-                    // Note: poll() already removed the fetch from buffer, so 
no need to call poll()
-                } else {
-                    // No more fetches available
-                    break;
                 }
-            } else {
-                // Fetch records from next_in_line
-                if let Some(mut next_fetch) = next_in_line {
-                    let records =
-                        self.fetch_records_from_fetch(&mut next_fetch, 
records_remaining)?;
-
-                    if !records.is_empty() {
-                        let table_bucket = next_fetch.table_bucket().clone();
-                        // Merge with existing records for this bucket
-                        let existing = result.entry(table_bucket).or_default();
-                        let records_count = records.len();
-                        existing.extend(records);
-
-                        records_remaining = 
records_remaining.saturating_sub(records_count);
-                    }
+            }
+            Ok(())
+        };
 
-                    // If the fetch is not fully consumed, put it back for the 
next round
-                    if !next_fetch.is_consumed() {
-                        self.log_fetch_buffer
-                            .set_next_in_line_fetch(Some(next_fetch));
-                    }
-                    // If consumed, next_fetch will be dropped here (which is 
correct)
+        match collect_result {
+            Ok(()) => Ok(result),
+            Err(e) => {
+                if result.is_empty() {
+                    Err(e)
+                } else {
+                    Ok(result)
                 }
             }
         }
-
-        Ok(result)
     }
 
     /// Initialize a completed fetch, checking offset match and updating high 
watermark
@@ -833,12 +955,63 @@ impl LogFetcher {
         &self,
         mut completed_fetch: Box<dyn CompletedFetch>,
     ) -> Result<Option<Box<dyn CompletedFetch>>> {
-        // todo: handle error in initialize fetch
-        let table_bucket = completed_fetch.table_bucket();
+        if let Some(error) = completed_fetch.take_error() {
+            return Err(error);
+        }
+
+        let table_bucket = completed_fetch.table_bucket().clone();
         let fetch_offset = completed_fetch.next_fetch_offset();
 
+        if let Some(api_error) = completed_fetch.api_error() {
+            let error = FlussError::for_code(api_error.code);
+            let error_message = api_error.message.as_str();
+            self.log_scanner_status
+                .move_bucket_to_end(table_bucket.clone());
+            let action = completed_fetch
+                .fetch_error_context()
+                .map(|context| context.action)
+                .unwrap_or(FetchErrorAction::Unexpected);
+            match action {
+                FetchErrorAction::Ignore => {
+                    return Ok(None);
+                }
+                FetchErrorAction::LogOffsetOutOfRange => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "The fetching offset {fetch_offset} is out of 
range: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+                FetchErrorAction::Authorization => {
+                    return Err(Error::FlussAPIError {
+                        api_error: ApiError {
+                            code: api_error.code,
+                            message: api_error.message.to_string(),
+                        },
+                    });
+                }
+                FetchErrorAction::CorruptMessage => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "Encountered corrupt message when fetching offset 
{fetch_offset} for bucket {table_bucket}: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+                FetchErrorAction::Unexpected => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "Unexpected error code {error:?} while fetching at 
offset {fetch_offset} from bucket {table_bucket}: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+            }
+        }
+
         // Check if bucket is still subscribed
-        let Some(current_offset) = 
self.log_scanner_status.get_bucket_offset(table_bucket) else {
+        let Some(current_offset) = 
self.log_scanner_status.get_bucket_offset(&table_bucket) else {
             warn!(
                 "Discarding stale fetch response for bucket {table_bucket:?} 
since the bucket has been unsubscribed"
             );
@@ -857,7 +1030,7 @@ impl LogFetcher {
         let high_watermark = completed_fetch.high_watermark();
         if high_watermark >= 0 {
             self.log_scanner_status
-                .update_high_watermark(table_bucket, high_watermark);
+                .update_high_watermark(&table_bucket, high_watermark);
         }
 
         completed_fetch.set_initialized();
@@ -894,6 +1067,11 @@ impl LogFetcher {
                     .update_offset(&table_bucket, next_fetch_offset);
             }
 
+            if next_in_line_fetch.is_consumed() && 
next_in_line_fetch.records_read() > 0 {
+                self.log_scanner_status
+                    .move_bucket_to_end(table_bucket.clone());
+            }
+
             Ok(records)
         } else {
             // These records aren't next in line, ignore them
@@ -915,58 +1093,70 @@ impl LogFetcher {
         let mut batches_remaining = MAX_BATCHES;
         let mut bytes_consumed: usize = 0;
 
-        while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
-            let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+        let collect_result: Result<()> = {
+            while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
+                let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
 
-            match next_in_line {
-                Some(mut next_fetch) if !next_fetch.is_consumed() => {
-                    let batches =
-                        self.fetch_batches_from_fetch(&mut next_fetch, 
batches_remaining)?;
-                    let batch_count = batches.len();
+                match next_in_line {
+                    Some(mut next_fetch) if !next_fetch.is_consumed() => {
+                        let batches =
+                            self.fetch_batches_from_fetch(&mut next_fetch, 
batches_remaining)?;
+                        let batch_count = batches.len();
 
-                    if !batches.is_empty() {
-                        // Track bytes consumed (soft cap - may exceed by one 
fetch)
-                        let batch_bytes: usize =
-                            batches.iter().map(|b| 
b.get_array_memory_size()).sum();
-                        bytes_consumed += batch_bytes;
+                        if !batches.is_empty() {
+                            // Track bytes consumed (soft cap - may exceed by 
one fetch)
+                            let batch_bytes: usize =
+                                batches.iter().map(|b| 
b.get_array_memory_size()).sum();
+                            bytes_consumed += batch_bytes;
 
-                        result.extend(batches);
-                        batches_remaining = 
batches_remaining.saturating_sub(batch_count);
-                    }
+                            result.extend(batches);
+                            batches_remaining = 
batches_remaining.saturating_sub(batch_count);
+                        }
 
-                    if !next_fetch.is_consumed() {
-                        self.log_fetch_buffer
-                            .set_next_in_line_fetch(Some(next_fetch));
+                        if !next_fetch.is_consumed() {
+                            self.log_fetch_buffer
+                                .set_next_in_line_fetch(Some(next_fetch));
+                        }
                     }
-                }
-                _ => {
-                    if let Some(completed_fetch) = 
self.log_fetch_buffer.poll() {
-                        if !completed_fetch.is_initialized() {
-                            let size_in_bytes = 
completed_fetch.size_in_bytes();
-                            match self.initialize_fetch(completed_fetch) {
-                                Ok(initialized) => {
-                                    
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
-                                    continue;
-                                }
-                                Err(e) => {
-                                    if result.is_empty() && size_in_bytes == 0 
{
+                    _ => {
+                        if let Some(completed_fetch) = 
self.log_fetch_buffer.poll() {
+                            if !completed_fetch.is_initialized() {
+                                let size_in_bytes = 
completed_fetch.size_in_bytes();
+                                match self.initialize_fetch(completed_fetch) {
+                                    Ok(initialized) => {
+                                        
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
                                         continue;
                                     }
-                                    return Err(e);
+                                    Err(e) => {
+                                        if result.is_empty() && size_in_bytes 
== 0 {
+                                            continue;
+                                        }
+                                        return Err(e);
+                                    }
                                 }
+                            } else {
+                                self.log_fetch_buffer
+                                    
.set_next_in_line_fetch(Some(completed_fetch));
                             }
                         } else {
-                            self.log_fetch_buffer
-                                .set_next_in_line_fetch(Some(completed_fetch));
+                            break;
                         }
-                    } else {
-                        break;
                     }
                 }
             }
-        }
+            Ok(())
+        };
 
-        Ok(result)
+        match collect_result {
+            Ok(()) => Ok(result),
+            Err(e) => {
+                if result.is_empty() {
+                    Err(e)
+                } else {
+                    Ok(result)
+                }
+            }
+        }
     }
 
     fn fetch_batches_from_fetch(
@@ -1231,3 +1421,175 @@ impl BucketScanStatus {
         *self.high_watermark.write() = high_watermark
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::client::WriteRecord;
+    use crate::client::metadata::Metadata;
+    use crate::compression::{
+        ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+    };
+    use crate::metadata::{TableInfo, TablePath};
+    use crate::record::MemoryLogRecordsArrowBuilder;
+    use crate::row::{Datum, GenericRow};
+    use crate::rpc::FlussError;
+    use crate::test_utils::{build_cluster_arc, build_table_info};
+
+    fn build_records(table_info: &TableInfo, table_path: Arc<TablePath>) -> 
Result<Vec<u8>> {
+        let mut builder = MemoryLogRecordsArrowBuilder::new(
+            1,
+            table_info.get_row_type(),
+            false,
+            ArrowCompressionInfo {
+                compression_type: ArrowCompressionType::None,
+                compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+            },
+        );
+        let record = WriteRecord::new(
+            table_path,
+            GenericRow {
+                values: vec![Datum::Int32(1)],
+            },
+        );
+        builder.append(&record)?;
+        builder.build()
+    }
+
+    #[tokio::test]
+    async fn collect_fetches_updates_offset() -> Result<()> {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = build_table_info(table_path.clone(), 1, 1);
+        let cluster = build_cluster_arc(&table_path, 1, 1);
+        let metadata = Arc::new(Metadata::new_for_test(cluster));
+        let status = Arc::new(LogScannerStatus::new());
+        let fetcher = LogFetcher::new(
+            table_info.clone(),
+            Arc::new(RpcClient::new()),
+            metadata,
+            status.clone(),
+            None,
+        )?;
+
+        let bucket = TableBucket::new(1, 0);
+        status.assign_scan_bucket(bucket.clone(), 0);
+
+        let data = build_records(&table_info, Arc::new(table_path))?;
+        let log_records = LogRecordsBatches::new(data.clone());
+        let read_context = 
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+        let completed =
+            DefaultCompletedFetch::new(bucket.clone(), log_records, 
data.len(), read_context, 0, 0);
+        fetcher.log_fetch_buffer.add(Box::new(completed));
+
+        let fetched = fetcher.collect_fetches()?;
+        assert_eq!(fetched.get(&bucket).unwrap().len(), 1);
+        assert_eq!(status.get_bucket_offset(&bucket), Some(1));
+        Ok(())
+    }
+
+    #[test]
+    fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = build_table_info(table_path.clone(), 1, 1);
+        let cluster = build_cluster_arc(&table_path, 1, 1);
+        let metadata = Arc::new(Metadata::new_for_test(cluster));
+        let status = Arc::new(LogScannerStatus::new());
+        let fetcher = LogFetcher::new(
+            table_info.clone(),
+            Arc::new(RpcClient::new()),
+            metadata,
+            status,
+            None,
+        )?;
+
+        let bucket = TableBucket::new(1, 0);
+        let data = build_records(&table_info, Arc::new(table_path))?;
+        let log_records = LogRecordsBatches::new(data.clone());
+        let read_context = 
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+        let mut completed: Box<dyn CompletedFetch> = 
Box::new(DefaultCompletedFetch::new(
+            bucket,
+            log_records,
+            data.len(),
+            read_context,
+            0,
+            0,
+        ));
+
+        let records = fetcher.fetch_records_from_fetch(&mut completed, 10)?;
+        assert!(records.is_empty());
+        assert!(completed.is_consumed());
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn prepare_fetch_log_requests_skips_pending() -> Result<()> {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = build_table_info(table_path.clone(), 1, 1);
+        let cluster = build_cluster_arc(&table_path, 1, 1);
+        let metadata = Arc::new(Metadata::new_for_test(cluster));
+        let status = Arc::new(LogScannerStatus::new());
+        status.assign_scan_bucket(TableBucket::new(1, 0), 0);
+        let fetcher = LogFetcher::new(
+            table_info,
+            Arc::new(RpcClient::new()),
+            metadata,
+            status,
+            None,
+        )?;
+
+        fetcher.nodes_with_pending_fetch_requests.lock().insert(1);
+
+        let requests = fetcher.prepare_fetch_log_requests().await;
+        assert!(requests.is_empty());
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn handle_fetch_response_sets_error() -> Result<()> {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = build_table_info(table_path.clone(), 1, 1);
+        let cluster = build_cluster_arc(&table_path, 1, 1);
+        let metadata = Arc::new(Metadata::new_for_test(cluster));
+        let status = Arc::new(LogScannerStatus::new());
+        status.assign_scan_bucket(TableBucket::new(1, 0), 5);
+        let fetcher = LogFetcher::new(
+            table_info.clone(),
+            Arc::new(RpcClient::new()),
+            metadata.clone(),
+            status.clone(),
+            None,
+        )?;
+
+        let response = crate::proto::FetchLogResponse {
+            tables_resp: vec![crate::proto::PbFetchLogRespForTable {
+                table_id: 1,
+                buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
+                    partition_id: None,
+                    bucket_id: 0,
+                    error_code: 
Some(FlussError::AuthorizationException.code()),
+                    error_message: Some("denied".to_string()),
+                    high_watermark: None,
+                    log_start_offset: None,
+                    remote_log_fetch_info: None,
+                    records: None,
+                }],
+            }],
+        };
+
+        LogFetcher::handle_fetch_response(
+            response,
+            &fetcher.log_fetch_buffer,
+            &fetcher.log_scanner_status,
+            &fetcher.read_context,
+            &fetcher.remote_read_context,
+            &fetcher.remote_log_downloader,
+            &fetcher.credentials_cache,
+        )
+        .await;
+
+        let completed = fetcher.log_fetch_buffer.poll().expect("completed 
fetch");
+        let api_error = completed.api_error().expect("api error");
+        assert_eq!(api_error.code, FlussError::AuthorizationException.code());
+        Ok(())
+    }
+}
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index cb03a2c..ffac0af 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -455,7 +455,7 @@ mod tests {
     use crate::row::{Datum, GenericRow};
     use crate::rpc::FlussError;
     use crate::test_utils::build_cluster_arc;
-    use std::collections::HashSet;
+    use std::collections::{HashMap, HashSet};
 
     async fn build_ready_batch(
         accumulator: &RecordAccumulator,
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 0a368b7..368d8ab 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -99,6 +99,11 @@ pub enum Error {
     )]
     IoUnsupported { message: String },
 
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting wakeup error {}.", message)
+    )]
+    WakeupError { message: String },
     #[snafu(
         visibility(pub(crate)),
         display("Fluss hitting unsupported operation error {}.", message)
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 89fb7b9..c166ebe 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -17,7 +17,7 @@
 
 use crate::client::{Record, WriteRecord};
 use crate::compression::ArrowCompressionInfo;
-use crate::error::Result;
+use crate::error::{Error, Result};
 use crate::metadata::DataType;
 use crate::record::{ChangeType, ScanRecord};
 use crate::row::{ColumnarRow, GenericRow};
@@ -446,7 +446,7 @@ impl LogRecordBatch {
     }
 
     pub fn ensure_valid(&self) -> Result<()> {
-        // todo
+        // TODO enable validation once checksum handling is corrected.
         Ok(())
     }
 
@@ -780,8 +780,10 @@ impl ReadContext {
         arrow_schema: SchemaRef,
         projected_fields: Vec<usize>,
         is_from_remote: bool,
-    ) -> ReadContext {
-        let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
+    ) -> Result<ReadContext> {
+        Self::validate_projection(&arrow_schema, projected_fields.as_slice())?;
+        let target_schema =
+            Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice())?;
         // the logic is little bit of hard to understand, to refactor it to 
follow
         // java side
         let (need_do_reorder, sorted_fields) = {
@@ -804,16 +806,20 @@ impl ReadContext {
                 // Calculate reordering indexes to transform from sorted order 
to user-requested order
                 let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
                 for &original_idx in &projected_fields {
-                    let pos = sorted_fields
-                        .binary_search(&original_idx)
-                        .expect("projection index should exist in sorted 
list");
+                    let pos = 
sorted_fields.binary_search(&original_idx).map_err(|_| {
+                        Error::IllegalArgument {
+                            message: format!(
+                                "Projection index {original_idx} is invalid 
for the current schema."
+                            ),
+                        }
+                    })?;
                     reordering_indexes.push(pos);
                 }
                 Projection {
                     ordered_schema: Self::project_schema(
                         arrow_schema.clone(),
                         sorted_fields.as_slice(),
-                    ),
+                    )?,
                     projected_fields,
                     ordered_fields: sorted_fields,
                     reordering_indexes,
@@ -824,7 +830,7 @@ impl ReadContext {
                     ordered_schema: Self::project_schema(
                         arrow_schema.clone(),
                         projected_fields.as_slice(),
-                    ),
+                    )?,
                     ordered_fields: projected_fields.clone(),
                     projected_fields,
                     reordering_indexes: vec![],
@@ -833,21 +839,34 @@ impl ReadContext {
             }
         };
 
-        ReadContext {
+        Ok(ReadContext {
             target_schema,
             full_schema: arrow_schema,
             projection: Some(project),
             is_from_remote,
+        })
+    }
+
+    fn validate_projection(schema: &SchemaRef, projected_fields: &[usize]) -> 
Result<()> {
+        let field_count = schema.fields().len();
+        for &index in projected_fields {
+            if index >= field_count {
+                return Err(Error::IllegalArgument {
+                    message: format!(
+                        "Projection index {index} is out of bounds for schema 
with {field_count} fields."
+                    ),
+                });
+            }
         }
+        Ok(())
     }
 
-    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
SchemaRef {
-        // todo: handle the exception
-        SchemaRef::new(
-            schema
-                .project(projected_fields)
-                .expect("can't project schema"),
-        )
+    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
Result<SchemaRef> {
+        Ok(SchemaRef::new(schema.project(projected_fields).map_err(
+            |e| Error::IllegalArgument {
+                message: format!("Invalid projection: {e}"),
+            },
+        )?))
     }
 
     pub fn project_fields(&self) -> Option<&[usize]> {
@@ -1035,6 +1054,8 @@ pub struct MyVec<T>(pub StreamReader<T>);
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::error::Error;
+    use crate::metadata::DataField;
     use crate::metadata::DataTypes;
 
     #[test]
@@ -1207,6 +1228,18 @@ mod tests {
         );
     }
 
+    #[test]
+    fn projection_rejects_out_of_bounds_index() {
+        let row_type = DataTypes::row(vec![
+            DataField::new("id".to_string(), DataTypes::int(), None),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+        ]);
+        let schema = to_arrow_schema(&row_type);
+        let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], 
false);
+
+        assert!(matches!(result, Err(Error::IllegalArgument { .. })));
+    }
+
     fn le_bytes(vals: &[u32]) -> Vec<u8> {
         let mut out = Vec::with_capacity(vals.len() * 4);
         for &v in vals {
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index c5a3f8e..94997e8 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -182,3 +182,65 @@ impl IntoIterator for ScanRecords {
             .into_iter()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use ::arrow::array::{Int32Array, RecordBatch};
+    use ::arrow::datatypes::{DataType, Field, Schema};
+    use std::sync::Arc;
+
+    fn make_row(values: Vec<i32>, row_id: usize) -> ColumnarRow {
+        let schema = Arc::new(Schema::new(vec![Field::new("v", 
DataType::Int32, false)]));
+        let batch = RecordBatch::try_new(schema, 
vec![Arc::new(Int32Array::from(values))])
+            .expect("record batch");
+        ColumnarRow::new_with_row_id(Arc::new(batch), row_id)
+    }
+
+    #[test]
+    fn change_type_round_trip() {
+        let cases = [
+            (ChangeType::AppendOnly, "+A", 0),
+            (ChangeType::Insert, "+I", 1),
+            (ChangeType::UpdateBefore, "-U", 2),
+            (ChangeType::UpdateAfter, "+U", 3),
+            (ChangeType::Delete, "-D", 4),
+        ];
+
+        for (change_type, short, byte) in cases {
+            assert_eq!(change_type.short_string(), short);
+            assert_eq!(change_type.to_byte_value(), byte);
+            assert_eq!(ChangeType::from_byte_value(byte).unwrap(), 
change_type);
+        }
+
+        let err = ChangeType::from_byte_value(9).unwrap_err();
+        assert!(err.contains("Unsupported byte value"));
+    }
+
+    #[test]
+    fn scan_records_counts_and_iterates() {
+        let bucket0 = TableBucket::new(1, 0);
+        let bucket1 = TableBucket::new(1, 1);
+        let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7, 
ChangeType::Insert);
+        let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8, 
ChangeType::Delete);
+
+        let mut records = HashMap::new();
+        records.insert(bucket0.clone(), vec![record0.clone(), 
record1.clone()]);
+
+        let scan_records = ScanRecords::new(records);
+        assert_eq!(scan_records.records(&bucket0).len(), 2);
+        assert!(scan_records.records(&bucket1).is_empty());
+        assert_eq!(scan_records.count(), 2);
+
+        let collected: Vec<_> = scan_records.into_iter().collect();
+        assert_eq!(collected.len(), 2);
+    }
+
+    #[test]
+    fn scan_record_default_values() {
+        let record = ScanRecord::new_default(make_row(vec![1], 0));
+        assert_eq!(record.offset(), -1);
+        assert_eq!(record.timestamp(), -1);
+        assert_eq!(record.change_type(), &ChangeType::Insert);
+    }
+}
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 31f0fdf..90437c1 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -166,3 +166,67 @@ impl InternalRow for ColumnarRow {
             .value(self.row_id)
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{
+        BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, 
Float64Array, Int8Array,
+        Int16Array, Int32Array, Int64Array, StringArray,
+    };
+    use arrow::datatypes::{DataType, Field, Schema};
+
+    #[test]
+    fn columnar_row_reads_values() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("b", DataType::Boolean, false),
+            Field::new("i8", DataType::Int8, false),
+            Field::new("i16", DataType::Int16, false),
+            Field::new("i32", DataType::Int32, false),
+            Field::new("i64", DataType::Int64, false),
+            Field::new("f32", DataType::Float32, false),
+            Field::new("f64", DataType::Float64, false),
+            Field::new("s", DataType::Utf8, false),
+            Field::new("bin", DataType::Binary, false),
+            Field::new("char", DataType::FixedSizeBinary(2), false),
+        ]));
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(BooleanArray::from(vec![true])),
+                Arc::new(Int8Array::from(vec![1])),
+                Arc::new(Int16Array::from(vec![2])),
+                Arc::new(Int32Array::from(vec![3])),
+                Arc::new(Int64Array::from(vec![4])),
+                Arc::new(Float32Array::from(vec![1.25])),
+                Arc::new(Float64Array::from(vec![2.5])),
+                Arc::new(StringArray::from(vec!["hello"])),
+                Arc::new(BinaryArray::from(vec![b"data".as_slice()])),
+                Arc::new(
+                    FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+                        vec![Some(b"ab".as_slice())].into_iter(),
+                        2,
+                    )
+                    .expect("fixed array"),
+                ),
+            ],
+        )
+        .expect("record batch");
+
+        let mut row = ColumnarRow::new(Arc::new(batch));
+        assert_eq!(row.get_field_count(), 10);
+        assert!(row.get_boolean(0));
+        assert_eq!(row.get_byte(1), 1);
+        assert_eq!(row.get_short(2), 2);
+        assert_eq!(row.get_int(3), 3);
+        assert_eq!(row.get_long(4), 4);
+        assert_eq!(row.get_float(5), 1.25);
+        assert_eq!(row.get_double(6), 2.5);
+        assert_eq!(row.get_string(7), "hello");
+        assert_eq!(row.get_bytes(8), b"data");
+        assert_eq!(row.get_char(9, 2), "ab");
+        row.set_row_id(0);
+        assert_eq!(row.get_row_id(), 0);
+    }
+}
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs 
b/crates/fluss/src/rpc/message/list_offsets.rs
index 9ab1f14..fcecb41 100644
--- a/crates/fluss/src/rpc/message/list_offsets.rs
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -17,9 +17,9 @@
 
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 
-use crate::error::Error;
 use crate::error::Result as FlussResult;
-use crate::proto::ListOffsetsResponse;
+use crate::error::{Error, FlussError};
+use crate::proto::{ErrorResponse, ListOffsetsResponse};
 use crate::rpc::frame::ReadError;
 
 use crate::rpc::api_key::ApiKey;
@@ -108,22 +108,48 @@ impl ListOffsetsResponse {
         self.buckets_resp
             .iter()
             .map(|resp| {
-                if resp.error_code.is_some() {
-                    // todo: consider use another suitable error
-                    Err(Error::UnexpectedError {
+                if let Some(error_code) = resp.error_code
+                    && error_code != FlussError::None.code()
+                {
+                    let api_error = ErrorResponse {
+                        error_code,
+                        error_message: resp.error_message.clone(),
+                    }
+                    .into();
+                    return Err(Error::FlussAPIError { api_error });
+                }
+                // if no error msg, offset must exists
+                resp.offset
+                    .map(|offset| (resp.bucket_id, offset))
+                    .ok_or_else(|| Error::UnexpectedError {
                         message: format!(
-                            "Missing offset, error message: {}",
-                            resp.error_message
-                                .as_deref()
-                                .unwrap_or("unknown server exception")
+                            "Missing offset for bucket {} without error code.",
+                            resp.bucket_id
                         ),
                         source: None,
                     })
-                } else {
-                    // if no error msg, offset must exists
-                    Ok((resp.bucket_id, resp.offset.unwrap()))
-                }
             })
             .collect()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::proto::{ListOffsetsResponse, PbListOffsetsRespForBucket};
+
+    #[test]
+    fn offsets_returns_api_error_on_error_code() {
+        let response = ListOffsetsResponse {
+            buckets_resp: vec![PbListOffsetsRespForBucket {
+                bucket_id: 1,
+                error_code: Some(FlussError::TableNotExist.code()),
+                error_message: Some("missing".to_string()),
+                offset: None,
+            }],
+        };
+
+        let result = response.offsets();
+        assert!(matches!(result, Err(Error::FlussAPIError { .. })));
+    }
+}
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index d191615..30424e5 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -184,3 +184,57 @@ impl<S> Default for FairBucketStatusMap<S> {
         Self::new()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::sync::Arc;
+
+    #[test]
+    fn fair_bucket_status_map_tracks_order_and_size() {
+        let bucket0 = TableBucket::new(1, 0);
+        let bucket1 = TableBucket::new(1, 1);
+
+        let mut map = FairBucketStatusMap::new();
+        map.update_and_move_to_end(bucket0.clone(), 10);
+        map.update_and_move_to_end(bucket1.clone(), 20);
+        assert_eq!(map.size(), 2);
+
+        let values: Vec<i32> = map
+            .bucket_status_values()
+            .into_iter()
+            .map(|value| **value)
+            .collect();
+        assert_eq!(values, vec![10, 20]);
+
+        map.move_to_end(bucket0.clone());
+        let values: Vec<i32> = map
+            .bucket_status_values()
+            .into_iter()
+            .map(|value| **value)
+            .collect();
+        assert_eq!(values, vec![20, 10]);
+    }
+
+    #[test]
+    fn fair_bucket_status_map_mutations() {
+        let bucket0 = TableBucket::new(1, 0);
+        let bucket1 = TableBucket::new(2, 1);
+
+        let mut map = FairBucketStatusMap::new();
+        let mut input = HashMap::new();
+        input.insert(bucket0.clone(), Arc::new(1));
+        input.insert(bucket1.clone(), Arc::new(2));
+        map.set(input);
+
+        assert!(map.contains(&bucket0));
+        assert!(map.contains(&bucket1));
+        assert_eq!(map.bucket_set().len(), 2);
+
+        map.remove(&bucket1);
+        assert_eq!(map.size(), 1);
+
+        map.clear();
+        assert_eq!(map.size(), 0);
+    }
+}

Reply via email to