This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 18af060 chore: Improve read path error handling logic (#143)
18af060 is described below
commit 18af060a70f12567ad56ff5dbacf3ca52f2abb75
Author: AlexZhao <[email protected]>
AuthorDate: Sat Jan 17 14:33:57 2026 +0800
chore: Improve read path error handling logic (#143)
---
crates/fluss/src/client/table/log_fetch_buffer.rs | 378 ++++++++++++-
crates/fluss/src/client/table/remote_log.rs | 2 +-
crates/fluss/src/client/table/scanner.rs | 628 +++++++++++++++++-----
crates/fluss/src/client/write/sender.rs | 2 +-
crates/fluss/src/error.rs | 5 +
crates/fluss/src/record/arrow.rs | 67 ++-
crates/fluss/src/record/mod.rs | 62 +++
crates/fluss/src/row/column.rs | 64 +++
crates/fluss/src/rpc/message/list_offsets.rs | 52 +-
crates/fluss/src/util/mod.rs | 54 ++
10 files changed, 1122 insertions(+), 192 deletions(-)
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index c55c994..fb6981f 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -18,7 +18,7 @@
use arrow::array::RecordBatch;
use parking_lot::Mutex;
-use crate::error::Result;
+use crate::error::{ApiError, Error, Result};
use crate::metadata::TableBucket;
use crate::record::{
LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext,
ScanRecord,
@@ -29,12 +29,38 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::Notify;
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub(crate) enum FetchErrorAction {
+ Ignore,
+ LogOffsetOutOfRange,
+ Authorization,
+ CorruptMessage,
+ Unexpected,
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub(crate) enum FetchErrorLogLevel {
+ Debug,
+ Warn,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct FetchErrorContext {
+ pub(crate) action: FetchErrorAction,
+ pub(crate) log_level: FetchErrorLogLevel,
+ pub(crate) log_message: String,
+}
+
/// Represents a completed fetch that can be consumed
pub trait CompletedFetch: Send + Sync {
fn table_bucket(&self) -> &TableBucket;
+ fn api_error(&self) -> Option<&ApiError>;
+ fn fetch_error_context(&self) -> Option<&FetchErrorContext>;
+ fn take_error(&mut self) -> Option<Error>;
fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>>;
fn is_consumed(&self) -> bool;
+ fn records_read(&self) -> usize;
fn drain(&mut self);
fn size_in_bytes(&self) -> usize;
fn high_watermark(&self) -> i64;
@@ -52,6 +78,7 @@ pub trait PendingFetch: Send + Sync {
/// Thread-safe buffer for completed fetches
pub struct LogFetchBuffer {
+ read_context: ReadContext,
completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn
PendingFetch>>>>,
next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
@@ -60,8 +87,9 @@ pub struct LogFetchBuffer {
}
impl LogFetchBuffer {
- pub fn new() -> Self {
+ pub fn new(read_context: ReadContext) -> Self {
Self {
+ read_context,
completed_fetches: Mutex::new(VecDeque::new()),
pending_fetches: Mutex::new(HashMap::new()),
next_in_line_fetch: Mutex::new(None),
@@ -75,26 +103,28 @@ impl LogFetchBuffer {
self.completed_fetches.lock().is_empty()
}
- /// Wait for the buffer to become non-empty, with timeout
- /// Returns true if data became available, false if timeout
- pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+ /// Wait for the buffer to become non-empty, with timeout.
+ /// Returns true if data became available, false if timeout.
+ pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
let deadline = std::time::Instant::now() + timeout;
loop {
// Check if buffer is not empty
if !self.is_empty() {
- return true;
+ return Ok(true);
}
// Check if woken up
if self.woken_up.swap(false, Ordering::Acquire) {
- return true;
+ return Err(Error::WakeupError {
+ message: "The await operation was interrupted by
wakeup.".to_string(),
+ });
}
// Check if timeout
let now = std::time::Instant::now();
if now >= deadline {
- return false;
+ return Ok(false);
}
// Wait for notification with remaining time
@@ -102,7 +132,7 @@ impl LogFetchBuffer {
let notified = self.not_empty_notify.notified();
tokio::select! {
_ = tokio::time::sleep(remaining) => {
- return false; // Timeout
+ return Ok(false); // Timeout
}
_ = notified => {
// Got notification, check again
@@ -119,6 +149,26 @@ impl LogFetchBuffer {
self.not_empty_notify.notify_waiters();
}
+ pub(crate) fn add_api_error(
+ &self,
+ table_bucket: TableBucket,
+ api_error: ApiError,
+ fetch_error_context: FetchErrorContext,
+ fetch_offset: i64,
+ ) {
+ let error_fetch = DefaultCompletedFetch::from_api_error(
+ table_bucket,
+ api_error,
+ fetch_error_context,
+ fetch_offset,
+ self.read_context.clone(),
+ );
+ self.completed_fetches
+ .lock()
+ .push_back(Box::new(error_fetch));
+ self.not_empty_notify.notify_waiters();
+ }
+
/// Add a pending fetch to the buffer
pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) {
let table_bucket = pending_fetch.table_bucket().clone();
@@ -136,6 +186,7 @@ impl LogFetchBuffer {
// holding both locks simultaneously.
let mut completed_to_push: Vec<Box<dyn CompletedFetch>> = Vec::new();
let mut has_completed = false;
+ let mut pending_error: Option<Error> = None;
{
let mut pending_map = self.pending_fetches.lock();
if let Some(pendings) = pending_map.get_mut(table_bucket) {
@@ -148,8 +199,9 @@ impl LogFetchBuffer {
has_completed = true;
}
Err(e) => {
- // todo: handle exception?
- log::error!("Error when completing: {e}");
+ pending_error = Some(e);
+ has_completed = true;
+ break;
}
}
} else {
@@ -162,11 +214,22 @@ impl LogFetchBuffer {
}
}
+ if let Some(error) = pending_error {
+ let error_fetch = DefaultCompletedFetch::from_error(
+ table_bucket.clone(),
+ error,
+ -1,
+ self.read_context.clone(),
+ );
+ completed_to_push.push(Box::new(error_fetch));
+ }
+
if !completed_to_push.is_empty() {
let mut completed_queue = self.completed_fetches.lock();
for completed in completed_to_push {
completed_queue.push_back(completed);
}
+ has_completed = true;
}
if has_completed {
@@ -236,12 +299,6 @@ impl LogFetchBuffer {
}
}
-impl Default for LogFetchBuffer {
- fn default() -> Self {
- Self::new()
- }
-}
-
/// A wrapper that makes a completed fetch look like a pending fetch
struct CompletedPendingFetch {
completed_fetch: Box<dyn CompletedFetch>,
@@ -270,6 +327,9 @@ impl PendingFetch for CompletedPendingFetch {
/// Default implementation of CompletedFetch for in-memory log records
pub struct DefaultCompletedFetch {
table_bucket: TableBucket,
+ api_error: Option<ApiError>,
+ fetch_error_context: Option<FetchErrorContext>,
+ error: Option<Error>,
log_record_batch: LogRecordsBatches,
read_context: ReadContext,
next_fetch_offset: i64,
@@ -280,6 +340,9 @@ pub struct DefaultCompletedFetch {
records_read: usize,
current_record_iterator: Option<LogRecordIterator>,
current_record_batch: Option<LogRecordBatch>,
+ last_record: Option<ScanRecord>,
+ cached_record_error: Option<String>,
+ corrupt_last_record: bool,
}
impl DefaultCompletedFetch {
@@ -290,9 +353,12 @@ impl DefaultCompletedFetch {
read_context: ReadContext,
fetch_offset: i64,
high_watermark: i64,
- ) -> Result<Self> {
- Ok(Self {
+ ) -> Self {
+ Self {
table_bucket,
+ api_error: None,
+ fetch_error_context: None,
+ error: None,
log_record_batch,
read_context,
next_fetch_offset: fetch_offset,
@@ -303,7 +369,65 @@ impl DefaultCompletedFetch {
records_read: 0,
current_record_iterator: None,
current_record_batch: None,
- })
+ last_record: None,
+ cached_record_error: None,
+ corrupt_last_record: false,
+ }
+ }
+
+ pub(crate) fn from_error(
+ table_bucket: TableBucket,
+ error: Error,
+ fetch_offset: i64,
+ read_context: ReadContext,
+ ) -> Self {
+ Self {
+ table_bucket,
+ api_error: None,
+ fetch_error_context: None,
+ error: Some(error),
+ log_record_batch: LogRecordsBatches::new(Vec::new()),
+ read_context,
+ next_fetch_offset: fetch_offset,
+ high_watermark: -1,
+ size_in_bytes: 0,
+ consumed: false,
+ initialized: false,
+ records_read: 0,
+ current_record_iterator: None,
+ current_record_batch: None,
+ last_record: None,
+ cached_record_error: None,
+ corrupt_last_record: false,
+ }
+ }
+
+ pub(crate) fn from_api_error(
+ table_bucket: TableBucket,
+ api_error: ApiError,
+ fetch_error_context: FetchErrorContext,
+ fetch_offset: i64,
+ read_context: ReadContext,
+ ) -> Self {
+ Self {
+ table_bucket,
+ api_error: Some(api_error),
+ fetch_error_context: Some(fetch_error_context),
+ error: None,
+ log_record_batch: LogRecordsBatches::new(Vec::new()),
+ read_context,
+ next_fetch_offset: fetch_offset,
+ high_watermark: -1,
+ size_in_bytes: 0,
+ consumed: false,
+ initialized: false,
+ records_read: 0,
+ current_record_iterator: None,
+ current_record_batch: None,
+ last_record: None,
+ cached_record_error: None,
+ corrupt_last_record: false,
+ }
}
/// Get the next fetched record, handling batch iteration and record
skipping
@@ -330,6 +454,19 @@ impl DefaultCompletedFetch {
}
}
+ fn fetch_error(&self) -> Error {
+ let mut message = format!(
+ "Received exception when fetching the next record from
{table_bucket}. If needed, please back to past the record to continue
scanning.",
+ table_bucket = self.table_bucket
+ );
+ if let Some(cause) = self.cached_record_error.as_deref() {
+ message.push_str(&format!(" Cause: {cause}"));
+ }
+ Error::UnexpectedError {
+ message,
+ source: None,
+ }
+ }
/// Get the next batch directly without row iteration
fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
@@ -368,8 +505,36 @@ impl CompletedFetch for DefaultCompletedFetch {
&self.table_bucket
}
+ fn api_error(&self) -> Option<&ApiError> {
+ self.api_error.as_ref()
+ }
+
+ fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+ self.fetch_error_context.as_ref()
+ }
+
+ fn take_error(&mut self) -> Option<Error> {
+ self.error.take()
+ }
+
fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>
{
- // todo: handle corrupt_last_record
+ if let Some(error) = self.error.take() {
+ return Err(error);
+ }
+
+ if let Some(api_error) = self.api_error.as_ref() {
+ return Err(Error::FlussAPIError {
+ api_error: ApiError {
+ code: api_error.code,
+ message: api_error.message.clone(),
+ },
+ });
+ }
+
+ if self.corrupt_last_record {
+ return Err(self.fetch_error());
+ }
+
if self.consumed {
return Ok(Vec::new());
}
@@ -377,19 +542,53 @@ impl CompletedFetch for DefaultCompletedFetch {
let mut scan_records = Vec::new();
for _ in 0..max_records {
- if let Some(record) = self.next_fetched_record()? {
- self.next_fetch_offset = record.offset() + 1;
- self.records_read += 1;
- scan_records.push(record);
- } else {
- break;
+ if self.cached_record_error.is_none() {
+ self.corrupt_last_record = true;
+ match self.next_fetched_record() {
+ Ok(Some(record)) => {
+ self.corrupt_last_record = false;
+ self.last_record = Some(record);
+ }
+ Ok(None) => {
+ self.corrupt_last_record = false;
+ self.last_record = None;
+ }
+ Err(e) => {
+ self.cached_record_error = Some(e.to_string());
+ }
+ }
}
+
+ let Some(record) = self.last_record.take() else {
+ break;
+ };
+
+ self.next_fetch_offset = record.offset() + 1;
+ self.records_read += 1;
+ scan_records.push(record);
+ }
+
+ if self.cached_record_error.is_some() && scan_records.is_empty() {
+ return Err(self.fetch_error());
}
Ok(scan_records)
}
fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>> {
+ if let Some(error) = self.error.take() {
+ return Err(error);
+ }
+
+ if let Some(api_error) = self.api_error.as_ref() {
+ return Err(Error::FlussAPIError {
+ api_error: ApiError {
+ code: api_error.code,
+ message: api_error.message.clone(),
+ },
+ });
+ }
+
if self.consumed {
return Ok(Vec::new());
}
@@ -410,8 +609,18 @@ impl CompletedFetch for DefaultCompletedFetch {
self.consumed
}
+ fn records_read(&self) -> usize {
+ self.records_read
+ }
+
fn drain(&mut self) {
self.consumed = true;
+ self.api_error = None;
+ self.fetch_error_context = None;
+ self.error = None;
+ self.cached_record_error = None;
+ self.corrupt_last_record = false;
+ self.last_record = None;
}
fn size_in_bytes(&self) -> usize {
@@ -434,3 +643,118 @@ impl CompletedFetch for DefaultCompletedFetch {
self.next_fetch_offset
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::client::WriteRecord;
+ use crate::compression::{
+ ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ };
+ use crate::metadata::{DataField, DataTypes, TablePath};
+ use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext,
to_arrow_schema};
+ use crate::row::GenericRow;
+ use std::sync::Arc;
+ use std::time::Duration;
+
+ fn test_read_context() -> ReadContext {
+ let row_type = DataTypes::row(vec![DataField::new(
+ "id".to_string(),
+ DataTypes::int(),
+ None,
+ )]);
+ ReadContext::new(to_arrow_schema(&row_type), false)
+ }
+
+ struct ErrorPendingFetch {
+ table_bucket: TableBucket,
+ }
+
+ impl PendingFetch for ErrorPendingFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ &self.table_bucket
+ }
+
+ fn is_completed(&self) -> bool {
+ true
+ }
+
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn
CompletedFetch>> {
+ Err(Error::UnexpectedError {
+ message: "pending fetch failure".to_string(),
+ source: None,
+ })
+ }
+ }
+
+ #[tokio::test]
+ async fn await_not_empty_returns_wakeup_error() {
+ let buffer = LogFetchBuffer::new(test_read_context());
+ buffer.wakeup();
+
+ let result = buffer.await_not_empty(Duration::from_millis(10)).await;
+ assert!(matches!(result, Err(Error::WakeupError { .. })));
+ }
+
+ #[tokio::test]
+ async fn await_not_empty_returns_pending_error() {
+ let buffer = LogFetchBuffer::new(test_read_context());
+ let table_bucket = TableBucket::new(1, 0);
+ buffer.pend(Box::new(ErrorPendingFetch {
+ table_bucket: table_bucket.clone(),
+ }));
+ buffer.try_complete(&table_bucket);
+
+ let result = buffer.await_not_empty(Duration::from_millis(10)).await;
+ assert!(matches!(result, Ok(true)));
+
+ let mut completed = buffer.poll().expect("completed fetch");
+ assert!(completed.take_error().is_some());
+ }
+
+ #[test]
+ fn default_completed_fetch_reads_records() -> Result<()> {
+ let row_type = DataTypes::row(vec![
+ DataField::new("id".to_string(), DataTypes::int(), None),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ ]);
+ let table_path = Arc::new(TablePath::new("db".to_string(),
"tbl".to_string()));
+
+ let mut builder = MemoryLogRecordsArrowBuilder::new(
+ 1,
+ &row_type,
+ false,
+ ArrowCompressionInfo {
+ compression_type: ArrowCompressionType::None,
+ compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ },
+ );
+
+ let mut row = GenericRow::new();
+ row.set_field(0, 1_i32);
+ row.set_field(1, "alice");
+ let record = WriteRecord::new(table_path, row);
+ builder.append(&record)?;
+
+ let data = builder.build()?;
+ let log_records = LogRecordsBatches::new(data.clone());
+ let read_context = ReadContext::new(to_arrow_schema(&row_type), false);
+ let mut fetch = DefaultCompletedFetch::new(
+ TableBucket::new(1, 0),
+ log_records,
+ data.len(),
+ read_context,
+ 0,
+ 0,
+ );
+
+ let records = fetch.fetch_records(10)?;
+ assert_eq!(records.len(), 1);
+ assert_eq!(records[0].offset(), 0);
+
+ let empty = fetch.fetch_records(10)?;
+ assert!(empty.is_empty());
+
+ Ok(())
+ }
+}
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index d9abd19..0142515 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -409,7 +409,7 @@ impl PendingFetch for RemotePendingFetch {
self.read_context,
self.fetch_offset,
self.high_watermark,
- )?;
+ );
Ok(Box::new(completed_fetch))
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 7d22324..3e7d61f 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -17,7 +17,7 @@
use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
-use log::{debug, error, warn};
+use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, HashSet};
use std::slice::from_ref;
@@ -29,16 +29,17 @@ use crate::client::connection::FlussConnection;
use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
use crate::client::table::log_fetch_buffer::{
- CompletedFetch, DefaultCompletedFetch, LogFetchBuffer,
+ CompletedFetch, DefaultCompletedFetch, FetchErrorAction,
FetchErrorContext, FetchErrorLogLevel,
+ LogFetchBuffer,
};
use crate::client::table::remote_log::{
RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
};
-use crate::error::{Error, Result, RpcError};
+use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{TableBucket, TableInfo, TablePath};
-use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
+use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
-use crate::rpc::{RpcClient, message};
+use crate::rpc::{RpcClient, RpcError, message};
use crate::util::FairBucketStatusMap;
const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
@@ -318,7 +319,7 @@ impl LogScannerInner {
.log_fetcher
.log_fetch_buffer
.await_not_empty(remaining)
- .await;
+ .await?;
if !has_data {
// Timeout while waiting
@@ -396,7 +397,7 @@ impl LogScannerInner {
.log_fetcher
.log_fetch_buffer
.await_not_empty(remaining)
- .await;
+ .await?;
if !has_data {
return Ok(Vec::new());
@@ -448,6 +449,8 @@ impl RecordBatchLogScanner {
struct LogFetcher {
conns: Arc<RpcClient>,
metadata: Arc<Metadata>,
+ table_path: TablePath,
+ is_partitioned: bool,
log_scanner_status: Arc<LogScannerStatus>,
read_context: ReadContext,
remote_read_context: ReadContext,
@@ -457,8 +460,6 @@ struct LogFetcher {
credentials_cache: Arc<CredentialsCache>,
log_fetch_buffer: Arc<LogFetchBuffer>,
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
- table_path: TablePath,
- is_partitioned: bool,
}
impl LogFetcher {
@@ -471,24 +472,25 @@ impl LogFetcher {
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
let read_context =
- Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false);
+ Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false)?;
let remote_read_context =
- Self::create_read_context(full_arrow_schema,
projected_fields.clone(), true);
+ Self::create_read_context(full_arrow_schema,
projected_fields.clone(), true)?;
let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
+ let log_fetch_buffer =
Arc::new(LogFetchBuffer::new(read_context.clone()));
Ok(LogFetcher {
conns: conns.clone(),
metadata: metadata.clone(),
+ table_path: table_info.table_path.clone(),
+ is_partitioned: table_info.is_partitioned(),
log_scanner_status,
read_context,
remote_read_context,
remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
- log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+ log_fetch_buffer,
nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
- table_path: table_info.table_path.clone(),
- is_partitioned: table_info.is_partitioned(),
})
}
@@ -496,23 +498,79 @@ impl LogFetcher {
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
is_from_remote: bool,
- ) -> ReadContext {
+ ) -> Result<ReadContext> {
match projected_fields {
- None => ReadContext::new(full_arrow_schema, is_from_remote),
+ None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)),
Some(fields) => {
ReadContext::with_projection_pushdown(full_arrow_schema,
fields, is_from_remote)
}
}
}
- async fn check_and_update_metadata(&self) -> Result<()> {
- if self.is_partitioned {
- // TODO: Implement partition-aware metadata refresh for buckets
whose leaders are unknown.
- // The implementation will likely need to collect partition IDs
for such buckets and
- // perform targeted metadata updates. Until then, we avoid
computing unused partition_ids.
- return Ok(());
+ fn describe_fetch_error(
+ error: FlussError,
+ table_bucket: &TableBucket,
+ fetch_offset: i64,
+ error_message: &str,
+ ) -> FetchErrorContext {
+ match error {
+ FlussError::NotLeaderOrFollower
+ | FlussError::LogStorageException
+ | FlussError::KvStorageException
+ | FlussError::StorageException
+ | FlussError::FencedLeaderEpochException => FetchErrorContext {
+ action: FetchErrorAction::Ignore,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Error in fetch for bucket {table_bucket}: {error:?}:
{error_message}"
+ ),
+ },
+ FlussError::UnknownTableOrBucketException => FetchErrorContext {
+ action: FetchErrorAction::Ignore,
+ log_level: FetchErrorLogLevel::Warn,
+ log_message: format!(
+ "Received unknown table or bucket error in fetch for
bucket {table_bucket}"
+ ),
+ },
+ FlussError::LogOffsetOutOfRangeException => FetchErrorContext {
+ action: FetchErrorAction::LogOffsetOutOfRange,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "The fetching offset {fetch_offset} is out of range for
bucket {table_bucket}: {error_message}"
+ ),
+ },
+ FlussError::AuthorizationException => FetchErrorContext {
+ action: FetchErrorAction::Authorization,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Authorization error while fetching offset {fetch_offset}
for bucket {table_bucket}: {error_message}"
+ ),
+ },
+ FlussError::UnknownServerError => FetchErrorContext {
+ action: FetchErrorAction::Ignore,
+ log_level: FetchErrorLogLevel::Warn,
+ log_message: format!(
+ "Unknown server error while fetching offset {fetch_offset}
for bucket {table_bucket}: {error_message}"
+ ),
+ },
+ FlussError::CorruptMessage => FetchErrorContext {
+ action: FetchErrorAction::CorruptMessage,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Encountered corrupt message when fetching offset
{fetch_offset} for bucket {table_bucket}: {error_message}"
+ ),
+ },
+ _ => FetchErrorContext {
+ action: FetchErrorAction::Unexpected,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Unexpected error code {error:?} while fetching at offset
{fetch_offset} from bucket {table_bucket}: {error_message}"
+ ),
+ },
}
+ }
+ async fn check_and_update_metadata(&self) -> Result<()> {
let need_update = self
.fetchable_buckets()
.iter()
@@ -522,6 +580,26 @@ impl LogFetcher {
return Ok(());
}
+ if self.is_partitioned {
+ // Fallback to full table metadata refresh until partition-aware
updates are available.
+ self.metadata
+ .update_tables_metadata(&HashSet::from([&self.table_path]))
+ .await
+ .or_else(|e| {
+ if let Error::RpcError { source, .. } = &e
+ && matches!(source, RpcError::ConnectionError(_) |
RpcError::Poisoned(_))
+ {
+ warn!(
+ "Retrying after encountering error while updating
table metadata: {e}"
+ );
+ Ok(())
+ } else {
+ Err(e)
+ }
+ })?;
+ return Ok(());
+ }
+
// TODO: Handle PartitionNotExist error
self.metadata
.update_tables_metadata(&HashSet::from([&self.table_path]))
@@ -561,7 +639,6 @@ impl LogFetcher {
let creds_cache = self.credentials_cache.clone();
let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
let metadata = self.metadata.clone();
-
// Spawn async task to handle the fetch request
// Note: These tasks are not explicitly tracked or cancelled when
LogFetcher is dropped.
// This is acceptable because:
@@ -607,7 +684,7 @@ impl LogFetcher {
}
};
- if let Err(e) = Self::handle_fetch_response(
+ Self::handle_fetch_response(
fetch_response,
&log_fetch_buffer,
&log_scanner_status,
@@ -616,10 +693,7 @@ impl LogFetcher {
&remote_log_downloader,
&creds_cache,
)
- .await
- {
- error!("Fail to handle fetch response: {e:?}");
- }
+ .await;
});
}
@@ -644,7 +718,7 @@ impl LogFetcher {
remote_read_context: &ReadContext,
remote_log_downloader: &Arc<RemoteLogDownloader>,
credentials_cache: &Arc<CredentialsCache>,
- ) -> Result<()> {
+ ) {
for pb_fetch_log_resp in fetch_response.tables_resp {
let table_id = pb_fetch_log_resp.table_id;
let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
@@ -661,11 +735,45 @@ impl LogFetcher {
continue;
};
+ if let Some(error_code) = fetch_log_for_bucket.error_code
+ && error_code != FlussError::None.code()
+ {
+ let api_error: ApiError = ErrorResponse {
+ error_code,
+ error_message:
fetch_log_for_bucket.error_message.clone(),
+ }
+ .into();
+
+ let error = FlussError::for_code(error_code);
+ let error_context = Self::describe_fetch_error(
+ error,
+ &table_bucket,
+ fetch_offset,
+ api_error.message.as_str(),
+ );
+
log_scanner_status.move_bucket_to_end(table_bucket.clone());
+ match error_context.log_level {
+ FetchErrorLogLevel::Debug => {
+ debug!("{}", error_context.log_message);
+ }
+ FetchErrorLogLevel::Warn => {
+ warn!("{}", error_context.log_message);
+ }
+ }
+ log_fetch_buffer.add_api_error(
+ table_bucket.clone(),
+ api_error,
+ error_context,
+ fetch_offset,
+ );
+ continue;
+ }
+
// Check if this is a remote log fetch
if let Some(ref remote_log_fetch_info) =
fetch_log_for_bucket.remote_log_fetch_info
{
// set remote fs props
- let remote_fs_props =
credentials_cache.get_or_refresh().await?;
+ let remote_fs_props =
credentials_cache.get_or_refresh().await.unwrap();
remote_log_downloader.set_remote_fs_props(remote_fs_props);
let remote_fetch_info =
@@ -688,26 +796,18 @@ impl LogFetcher {
let size_in_bytes = records.len();
let log_record_batch = LogRecordsBatches::new(records);
- match DefaultCompletedFetch::new(
+ let completed_fetch = DefaultCompletedFetch::new(
table_bucket.clone(),
log_record_batch,
size_in_bytes,
read_context.clone(),
fetch_offset,
high_watermark,
- ) {
- Ok(completed_fetch) => {
- log_fetch_buffer.add(Box::new(completed_fetch));
- }
- Err(e) => {
- // todo: handle error
- log::warn!("Failed to create completed fetch:
{e:?}");
- }
- }
+ );
+ log_fetch_buffer.add(Box::new(completed_fetch));
}
}
}
- Ok(())
}
fn pending_remote_fetches(
@@ -763,69 +863,91 @@ impl LogFetcher {
let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
let mut records_remaining = MAX_POLL_RECORDS;
- while records_remaining > 0 {
- // Get the next in line fetch, or get a new one from buffer
- let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
-
- if next_in_line.is_none() ||
next_in_line.as_ref().unwrap().is_consumed() {
- // Get a new fetch from buffer
- if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
- // Initialize the fetch if not already initialized
- if !completed_fetch.is_initialized() {
- let size_in_bytes = completed_fetch.size_in_bytes();
- match self.initialize_fetch(completed_fetch) {
- Ok(initialized) => {
-
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
- continue;
+ let collect_result: Result<()> = {
+ while records_remaining > 0 {
+ // Get the next in line fetch, or get a new one from buffer
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+ if next_in_line.is_none() ||
next_in_line.as_ref().unwrap().is_consumed() {
+ // Get a new fetch from buffer
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ // Initialize the fetch if not already initialized
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ // Remove a completedFetch upon a parse
with exception if
+ // (1) it contains no records, and
+ // (2) there are no fetched records with
actual content preceding this
+ // exception.
+ if result.is_empty() && size_in_bytes == 0
{
+ // todo: do we need to consider it
like java ?
+ // self.log_fetch_buffer.poll();
+ }
+ return Err(e);
+ }
}
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ // Note: poll() already removed the fetch from buffer,
so no need to call poll()
+ } else {
+ // No more fetches available
+ break;
+ }
+ } else {
+ // Fetch records from next_in_line
+ if let Some(mut next_fetch) = next_in_line {
+ let records = match self
+ .fetch_records_from_fetch(&mut next_fetch,
records_remaining)
+ {
+ Ok(records) => records,
Err(e) => {
- // Remove a completedFetch upon a parse with
exception if
- // (1) it contains no records, and
- // (2) there are no fetched records with
actual content preceding this
- // exception.
- if result.is_empty() && size_in_bytes == 0 {
- // todo: do we need to consider it like
java ?
- // self.log_fetch_buffer.poll();
+ if !next_fetch.is_consumed() {
+ self.log_fetch_buffer
+
.set_next_in_line_fetch(Some(next_fetch));
}
return Err(e);
}
+ };
+
+ if !records.is_empty() {
+ let table_bucket =
next_fetch.table_bucket().clone();
+ // Merge with existing records for this bucket
+ let existing =
result.entry(table_bucket).or_default();
+ let records_count = records.len();
+ existing.extend(records);
+
+ records_remaining =
records_remaining.saturating_sub(records_count);
}
- } else {
- self.log_fetch_buffer
- .set_next_in_line_fetch(Some(completed_fetch));
+
+ // If the fetch is not fully consumed, put it back for
the next round
+ if !next_fetch.is_consumed() {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(next_fetch));
+ }
+ // If consumed, next_fetch will be dropped here (which
is correct)
}
- // Note: poll() already removed the fetch from buffer, so
no need to call poll()
- } else {
- // No more fetches available
- break;
}
- } else {
- // Fetch records from next_in_line
- if let Some(mut next_fetch) = next_in_line {
- let records =
- self.fetch_records_from_fetch(&mut next_fetch,
records_remaining)?;
-
- if !records.is_empty() {
- let table_bucket = next_fetch.table_bucket().clone();
- // Merge with existing records for this bucket
- let existing = result.entry(table_bucket).or_default();
- let records_count = records.len();
- existing.extend(records);
-
- records_remaining =
records_remaining.saturating_sub(records_count);
- }
+ }
+ Ok(())
+ };
- // If the fetch is not fully consumed, put it back for the
next round
- if !next_fetch.is_consumed() {
- self.log_fetch_buffer
- .set_next_in_line_fetch(Some(next_fetch));
- }
- // If consumed, next_fetch will be dropped here (which is
correct)
+ match collect_result {
+ Ok(()) => Ok(result),
+ Err(e) => {
+ if result.is_empty() {
+ Err(e)
+ } else {
+ Ok(result)
}
}
}
-
- Ok(result)
}
/// Initialize a completed fetch, checking offset match and updating high
watermark
@@ -833,12 +955,63 @@ impl LogFetcher {
&self,
mut completed_fetch: Box<dyn CompletedFetch>,
) -> Result<Option<Box<dyn CompletedFetch>>> {
- // todo: handle error in initialize fetch
- let table_bucket = completed_fetch.table_bucket();
+ if let Some(error) = completed_fetch.take_error() {
+ return Err(error);
+ }
+
+ let table_bucket = completed_fetch.table_bucket().clone();
let fetch_offset = completed_fetch.next_fetch_offset();
+ if let Some(api_error) = completed_fetch.api_error() {
+ let error = FlussError::for_code(api_error.code);
+ let error_message = api_error.message.as_str();
+ self.log_scanner_status
+ .move_bucket_to_end(table_bucket.clone());
+ let action = completed_fetch
+ .fetch_error_context()
+ .map(|context| context.action)
+ .unwrap_or(FetchErrorAction::Unexpected);
+ match action {
+ FetchErrorAction::Ignore => {
+ return Ok(None);
+ }
+ FetchErrorAction::LogOffsetOutOfRange => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "The fetching offset {fetch_offset} is out of
range: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ FetchErrorAction::Authorization => {
+ return Err(Error::FlussAPIError {
+ api_error: ApiError {
+ code: api_error.code,
+ message: api_error.message.to_string(),
+ },
+ });
+ }
+ FetchErrorAction::CorruptMessage => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Encountered corrupt message when fetching offset
{fetch_offset} for bucket {table_bucket}: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ FetchErrorAction::Unexpected => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Unexpected error code {error:?} while fetching at
offset {fetch_offset} from bucket {table_bucket}: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ }
+ }
+
// Check if bucket is still subscribed
- let Some(current_offset) =
self.log_scanner_status.get_bucket_offset(table_bucket) else {
+ let Some(current_offset) =
self.log_scanner_status.get_bucket_offset(&table_bucket) else {
warn!(
"Discarding stale fetch response for bucket {table_bucket:?}
since the bucket has been unsubscribed"
);
@@ -857,7 +1030,7 @@ impl LogFetcher {
let high_watermark = completed_fetch.high_watermark();
if high_watermark >= 0 {
self.log_scanner_status
- .update_high_watermark(table_bucket, high_watermark);
+ .update_high_watermark(&table_bucket, high_watermark);
}
completed_fetch.set_initialized();
@@ -894,6 +1067,11 @@ impl LogFetcher {
.update_offset(&table_bucket, next_fetch_offset);
}
+ if next_in_line_fetch.is_consumed() &&
next_in_line_fetch.records_read() > 0 {
+ self.log_scanner_status
+ .move_bucket_to_end(table_bucket.clone());
+ }
+
Ok(records)
} else {
// These records aren't next in line, ignore them
@@ -915,58 +1093,70 @@ impl LogFetcher {
let mut batches_remaining = MAX_BATCHES;
let mut bytes_consumed: usize = 0;
- while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
- let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+ let collect_result: Result<()> = {
+ while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
- match next_in_line {
- Some(mut next_fetch) if !next_fetch.is_consumed() => {
- let batches =
- self.fetch_batches_from_fetch(&mut next_fetch,
batches_remaining)?;
- let batch_count = batches.len();
+ match next_in_line {
+ Some(mut next_fetch) if !next_fetch.is_consumed() => {
+ let batches =
+ self.fetch_batches_from_fetch(&mut next_fetch,
batches_remaining)?;
+ let batch_count = batches.len();
- if !batches.is_empty() {
- // Track bytes consumed (soft cap - may exceed by one
fetch)
- let batch_bytes: usize =
- batches.iter().map(|b|
b.get_array_memory_size()).sum();
- bytes_consumed += batch_bytes;
+ if !batches.is_empty() {
+ // Track bytes consumed (soft cap - may exceed by
one fetch)
+ let batch_bytes: usize =
+ batches.iter().map(|b|
b.get_array_memory_size()).sum();
+ bytes_consumed += batch_bytes;
- result.extend(batches);
- batches_remaining =
batches_remaining.saturating_sub(batch_count);
- }
+ result.extend(batches);
+ batches_remaining =
batches_remaining.saturating_sub(batch_count);
+ }
- if !next_fetch.is_consumed() {
- self.log_fetch_buffer
- .set_next_in_line_fetch(Some(next_fetch));
+ if !next_fetch.is_consumed() {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(next_fetch));
+ }
}
- }
- _ => {
- if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
- if !completed_fetch.is_initialized() {
- let size_in_bytes =
completed_fetch.size_in_bytes();
- match self.initialize_fetch(completed_fetch) {
- Ok(initialized) => {
-
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
- continue;
- }
- Err(e) => {
- if result.is_empty() && size_in_bytes == 0
{
+ _ => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
continue;
}
- return Err(e);
+ Err(e) => {
+ if result.is_empty() && size_in_bytes
== 0 {
+ continue;
+ }
+ return Err(e);
+ }
}
+ } else {
+ self.log_fetch_buffer
+
.set_next_in_line_fetch(Some(completed_fetch));
}
} else {
- self.log_fetch_buffer
- .set_next_in_line_fetch(Some(completed_fetch));
+ break;
}
- } else {
- break;
}
}
}
- }
+ Ok(())
+ };
- Ok(result)
+ match collect_result {
+ Ok(()) => Ok(result),
+ Err(e) => {
+ if result.is_empty() {
+ Err(e)
+ } else {
+ Ok(result)
+ }
+ }
+ }
}
fn fetch_batches_from_fetch(
@@ -1231,3 +1421,175 @@ impl BucketScanStatus {
*self.high_watermark.write() = high_watermark
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::client::WriteRecord;
+ use crate::client::metadata::Metadata;
+ use crate::compression::{
+ ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ };
+ use crate::metadata::{TableInfo, TablePath};
+ use crate::record::MemoryLogRecordsArrowBuilder;
+ use crate::row::{Datum, GenericRow};
+ use crate::rpc::FlussError;
+ use crate::test_utils::{build_cluster_arc, build_table_info};
+
+ fn build_records(table_info: &TableInfo, table_path: Arc<TablePath>) ->
Result<Vec<u8>> {
+ let mut builder = MemoryLogRecordsArrowBuilder::new(
+ 1,
+ table_info.get_row_type(),
+ false,
+ ArrowCompressionInfo {
+ compression_type: ArrowCompressionType::None,
+ compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ },
+ );
+ let record = WriteRecord::new(
+ table_path,
+ GenericRow {
+ values: vec![Datum::Int32(1)],
+ },
+ );
+ builder.append(&record)?;
+ builder.build()
+ }
+
+ #[tokio::test]
+ async fn collect_fetches_updates_offset() -> Result<()> {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = build_table_info(table_path.clone(), 1, 1);
+ let cluster = build_cluster_arc(&table_path, 1, 1);
+ let metadata = Arc::new(Metadata::new_for_test(cluster));
+ let status = Arc::new(LogScannerStatus::new());
+ let fetcher = LogFetcher::new(
+ table_info.clone(),
+ Arc::new(RpcClient::new()),
+ metadata,
+ status.clone(),
+ None,
+ )?;
+
+ let bucket = TableBucket::new(1, 0);
+ status.assign_scan_bucket(bucket.clone(), 0);
+
+ let data = build_records(&table_info, Arc::new(table_path))?;
+ let log_records = LogRecordsBatches::new(data.clone());
+ let read_context =
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+ let completed =
+ DefaultCompletedFetch::new(bucket.clone(), log_records,
data.len(), read_context, 0, 0);
+ fetcher.log_fetch_buffer.add(Box::new(completed));
+
+ let fetched = fetcher.collect_fetches()?;
+ assert_eq!(fetched.get(&bucket).unwrap().len(), 1);
+ assert_eq!(status.get_bucket_offset(&bucket), Some(1));
+ Ok(())
+ }
+
+ #[test]
+ fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = build_table_info(table_path.clone(), 1, 1);
+ let cluster = build_cluster_arc(&table_path, 1, 1);
+ let metadata = Arc::new(Metadata::new_for_test(cluster));
+ let status = Arc::new(LogScannerStatus::new());
+ let fetcher = LogFetcher::new(
+ table_info.clone(),
+ Arc::new(RpcClient::new()),
+ metadata,
+ status,
+ None,
+ )?;
+
+ let bucket = TableBucket::new(1, 0);
+ let data = build_records(&table_info, Arc::new(table_path))?;
+ let log_records = LogRecordsBatches::new(data.clone());
+ let read_context =
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+ let mut completed: Box<dyn CompletedFetch> =
Box::new(DefaultCompletedFetch::new(
+ bucket,
+ log_records,
+ data.len(),
+ read_context,
+ 0,
+ 0,
+ ));
+
+ let records = fetcher.fetch_records_from_fetch(&mut completed, 10)?;
+ assert!(records.is_empty());
+ assert!(completed.is_consumed());
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn prepare_fetch_log_requests_skips_pending() -> Result<()> {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = build_table_info(table_path.clone(), 1, 1);
+ let cluster = build_cluster_arc(&table_path, 1, 1);
+ let metadata = Arc::new(Metadata::new_for_test(cluster));
+ let status = Arc::new(LogScannerStatus::new());
+ status.assign_scan_bucket(TableBucket::new(1, 0), 0);
+ let fetcher = LogFetcher::new(
+ table_info,
+ Arc::new(RpcClient::new()),
+ metadata,
+ status,
+ None,
+ )?;
+
+ fetcher.nodes_with_pending_fetch_requests.lock().insert(1);
+
+ let requests = fetcher.prepare_fetch_log_requests().await;
+ assert!(requests.is_empty());
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn handle_fetch_response_sets_error() -> Result<()> {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = build_table_info(table_path.clone(), 1, 1);
+ let cluster = build_cluster_arc(&table_path, 1, 1);
+ let metadata = Arc::new(Metadata::new_for_test(cluster));
+ let status = Arc::new(LogScannerStatus::new());
+ status.assign_scan_bucket(TableBucket::new(1, 0), 5);
+ let fetcher = LogFetcher::new(
+ table_info.clone(),
+ Arc::new(RpcClient::new()),
+ metadata.clone(),
+ status.clone(),
+ None,
+ )?;
+
+ let response = crate::proto::FetchLogResponse {
+ tables_resp: vec![crate::proto::PbFetchLogRespForTable {
+ table_id: 1,
+ buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
+ partition_id: None,
+ bucket_id: 0,
+ error_code:
Some(FlussError::AuthorizationException.code()),
+ error_message: Some("denied".to_string()),
+ high_watermark: None,
+ log_start_offset: None,
+ remote_log_fetch_info: None,
+ records: None,
+ }],
+ }],
+ };
+
+ LogFetcher::handle_fetch_response(
+ response,
+ &fetcher.log_fetch_buffer,
+ &fetcher.log_scanner_status,
+ &fetcher.read_context,
+ &fetcher.remote_read_context,
+ &fetcher.remote_log_downloader,
+ &fetcher.credentials_cache,
+ )
+ .await;
+
+ let completed = fetcher.log_fetch_buffer.poll().expect("completed
fetch");
+ let api_error = completed.api_error().expect("api error");
+ assert_eq!(api_error.code, FlussError::AuthorizationException.code());
+ Ok(())
+ }
+}
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index cb03a2c..ffac0af 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -455,7 +455,7 @@ mod tests {
use crate::row::{Datum, GenericRow};
use crate::rpc::FlussError;
use crate::test_utils::build_cluster_arc;
- use std::collections::HashSet;
+ use std::collections::{HashMap, HashSet};
async fn build_ready_batch(
accumulator: &RecordAccumulator,
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 0a368b7..368d8ab 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -99,6 +99,11 @@ pub enum Error {
)]
IoUnsupported { message: String },
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting wakeup error {}.", message)
+ )]
+ WakeupError { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting unsupported operation error {}.", message)
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 89fb7b9..c166ebe 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -17,7 +17,7 @@
use crate::client::{Record, WriteRecord};
use crate::compression::ArrowCompressionInfo;
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::DataType;
use crate::record::{ChangeType, ScanRecord};
use crate::row::{ColumnarRow, GenericRow};
@@ -446,7 +446,7 @@ impl LogRecordBatch {
}
pub fn ensure_valid(&self) -> Result<()> {
- // todo
+ // TODO enable validation once checksum handling is corrected.
Ok(())
}
@@ -780,8 +780,10 @@ impl ReadContext {
arrow_schema: SchemaRef,
projected_fields: Vec<usize>,
is_from_remote: bool,
- ) -> ReadContext {
- let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
+ ) -> Result<ReadContext> {
+ Self::validate_projection(&arrow_schema, projected_fields.as_slice())?;
+ let target_schema =
+ Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice())?;
// the logic is little bit of hard to understand, to refactor it to
follow
// java side
let (need_do_reorder, sorted_fields) = {
@@ -804,16 +806,20 @@ impl ReadContext {
// Calculate reordering indexes to transform from sorted order
to user-requested order
let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
for &original_idx in &projected_fields {
- let pos = sorted_fields
- .binary_search(&original_idx)
- .expect("projection index should exist in sorted
list");
+ let pos =
sorted_fields.binary_search(&original_idx).map_err(|_| {
+ Error::IllegalArgument {
+ message: format!(
+ "Projection index {original_idx} is invalid
for the current schema."
+ ),
+ }
+ })?;
reordering_indexes.push(pos);
}
Projection {
ordered_schema: Self::project_schema(
arrow_schema.clone(),
sorted_fields.as_slice(),
- ),
+ )?,
projected_fields,
ordered_fields: sorted_fields,
reordering_indexes,
@@ -824,7 +830,7 @@ impl ReadContext {
ordered_schema: Self::project_schema(
arrow_schema.clone(),
projected_fields.as_slice(),
- ),
+ )?,
ordered_fields: projected_fields.clone(),
projected_fields,
reordering_indexes: vec![],
@@ -833,21 +839,34 @@ impl ReadContext {
}
};
- ReadContext {
+ Ok(ReadContext {
target_schema,
full_schema: arrow_schema,
projection: Some(project),
is_from_remote,
+ })
+ }
+
+ fn validate_projection(schema: &SchemaRef, projected_fields: &[usize]) ->
Result<()> {
+ let field_count = schema.fields().len();
+ for &index in projected_fields {
+ if index >= field_count {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Projection index {index} is out of bounds for schema
with {field_count} fields."
+ ),
+ });
+ }
}
+ Ok(())
}
- pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
SchemaRef {
- // todo: handle the exception
- SchemaRef::new(
- schema
- .project(projected_fields)
- .expect("can't project schema"),
- )
+ pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
Result<SchemaRef> {
+ Ok(SchemaRef::new(schema.project(projected_fields).map_err(
+ |e| Error::IllegalArgument {
+ message: format!("Invalid projection: {e}"),
+ },
+ )?))
}
pub fn project_fields(&self) -> Option<&[usize]> {
@@ -1035,6 +1054,8 @@ pub struct MyVec<T>(pub StreamReader<T>);
#[cfg(test)]
mod tests {
use super::*;
+ use crate::error::Error;
+ use crate::metadata::DataField;
use crate::metadata::DataTypes;
#[test]
@@ -1207,6 +1228,18 @@ mod tests {
);
}
+ #[test]
+ fn projection_rejects_out_of_bounds_index() {
+ let row_type = DataTypes::row(vec![
+ DataField::new("id".to_string(), DataTypes::int(), None),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ ]);
+ let schema = to_arrow_schema(&row_type);
+ let result = ReadContext::with_projection_pushdown(schema, vec![0, 2],
false);
+
+ assert!(matches!(result, Err(Error::IllegalArgument { .. })));
+ }
+
fn le_bytes(vals: &[u32]) -> Vec<u8> {
let mut out = Vec::with_capacity(vals.len() * 4);
for &v in vals {
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index c5a3f8e..94997e8 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -182,3 +182,65 @@ impl IntoIterator for ScanRecords {
.into_iter()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use ::arrow::array::{Int32Array, RecordBatch};
+ use ::arrow::datatypes::{DataType, Field, Schema};
+ use std::sync::Arc;
+
+ fn make_row(values: Vec<i32>, row_id: usize) -> ColumnarRow {
+ let schema = Arc::new(Schema::new(vec![Field::new("v",
DataType::Int32, false)]));
+ let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(values))])
+ .expect("record batch");
+ ColumnarRow::new_with_row_id(Arc::new(batch), row_id)
+ }
+
+ #[test]
+ fn change_type_round_trip() {
+ let cases = [
+ (ChangeType::AppendOnly, "+A", 0),
+ (ChangeType::Insert, "+I", 1),
+ (ChangeType::UpdateBefore, "-U", 2),
+ (ChangeType::UpdateAfter, "+U", 3),
+ (ChangeType::Delete, "-D", 4),
+ ];
+
+ for (change_type, short, byte) in cases {
+ assert_eq!(change_type.short_string(), short);
+ assert_eq!(change_type.to_byte_value(), byte);
+ assert_eq!(ChangeType::from_byte_value(byte).unwrap(),
change_type);
+ }
+
+ let err = ChangeType::from_byte_value(9).unwrap_err();
+ assert!(err.contains("Unsupported byte value"));
+ }
+
+ #[test]
+ fn scan_records_counts_and_iterates() {
+ let bucket0 = TableBucket::new(1, 0);
+ let bucket1 = TableBucket::new(1, 1);
+ let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7,
ChangeType::Insert);
+ let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8,
ChangeType::Delete);
+
+ let mut records = HashMap::new();
+ records.insert(bucket0.clone(), vec![record0.clone(),
record1.clone()]);
+
+ let scan_records = ScanRecords::new(records);
+ assert_eq!(scan_records.records(&bucket0).len(), 2);
+ assert!(scan_records.records(&bucket1).is_empty());
+ assert_eq!(scan_records.count(), 2);
+
+ let collected: Vec<_> = scan_records.into_iter().collect();
+ assert_eq!(collected.len(), 2);
+ }
+
+ #[test]
+ fn scan_record_default_values() {
+ let record = ScanRecord::new_default(make_row(vec![1], 0));
+ assert_eq!(record.offset(), -1);
+ assert_eq!(record.timestamp(), -1);
+ assert_eq!(record.change_type(), &ChangeType::Insert);
+ }
+}
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 31f0fdf..90437c1 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -166,3 +166,67 @@ impl InternalRow for ColumnarRow {
.value(self.row_id)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{
+ BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array,
Float64Array, Int8Array,
+ Int16Array, Int32Array, Int64Array, StringArray,
+ };
+ use arrow::datatypes::{DataType, Field, Schema};
+
+ #[test]
+ fn columnar_row_reads_values() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("b", DataType::Boolean, false),
+ Field::new("i8", DataType::Int8, false),
+ Field::new("i16", DataType::Int16, false),
+ Field::new("i32", DataType::Int32, false),
+ Field::new("i64", DataType::Int64, false),
+ Field::new("f32", DataType::Float32, false),
+ Field::new("f64", DataType::Float64, false),
+ Field::new("s", DataType::Utf8, false),
+ Field::new("bin", DataType::Binary, false),
+ Field::new("char", DataType::FixedSizeBinary(2), false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(BooleanArray::from(vec![true])),
+ Arc::new(Int8Array::from(vec![1])),
+ Arc::new(Int16Array::from(vec![2])),
+ Arc::new(Int32Array::from(vec![3])),
+ Arc::new(Int64Array::from(vec![4])),
+ Arc::new(Float32Array::from(vec![1.25])),
+ Arc::new(Float64Array::from(vec![2.5])),
+ Arc::new(StringArray::from(vec!["hello"])),
+ Arc::new(BinaryArray::from(vec![b"data".as_slice()])),
+ Arc::new(
+ FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+ vec![Some(b"ab".as_slice())].into_iter(),
+ 2,
+ )
+ .expect("fixed array"),
+ ),
+ ],
+ )
+ .expect("record batch");
+
+ let mut row = ColumnarRow::new(Arc::new(batch));
+ assert_eq!(row.get_field_count(), 10);
+ assert!(row.get_boolean(0));
+ assert_eq!(row.get_byte(1), 1);
+ assert_eq!(row.get_short(2), 2);
+ assert_eq!(row.get_int(3), 3);
+ assert_eq!(row.get_long(4), 4);
+ assert_eq!(row.get_float(5), 1.25);
+ assert_eq!(row.get_double(6), 2.5);
+ assert_eq!(row.get_string(7), "hello");
+ assert_eq!(row.get_bytes(8), b"data");
+ assert_eq!(row.get_char(9, 2), "ab");
+ row.set_row_id(0);
+ assert_eq!(row.get_row_id(), 0);
+ }
+}
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs
b/crates/fluss/src/rpc/message/list_offsets.rs
index 9ab1f14..fcecb41 100644
--- a/crates/fluss/src/rpc/message/list_offsets.rs
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -17,9 +17,9 @@
use crate::{impl_read_version_type, impl_write_version_type, proto};
-use crate::error::Error;
use crate::error::Result as FlussResult;
-use crate::proto::ListOffsetsResponse;
+use crate::error::{Error, FlussError};
+use crate::proto::{ErrorResponse, ListOffsetsResponse};
use crate::rpc::frame::ReadError;
use crate::rpc::api_key::ApiKey;
@@ -108,22 +108,48 @@ impl ListOffsetsResponse {
self.buckets_resp
.iter()
.map(|resp| {
- if resp.error_code.is_some() {
- // todo: consider use another suitable error
- Err(Error::UnexpectedError {
+ if let Some(error_code) = resp.error_code
+ && error_code != FlussError::None.code()
+ {
+ let api_error = ErrorResponse {
+ error_code,
+ error_message: resp.error_message.clone(),
+ }
+ .into();
+ return Err(Error::FlussAPIError { api_error });
+ }
+ // if no error msg, offset must exists
+ resp.offset
+ .map(|offset| (resp.bucket_id, offset))
+ .ok_or_else(|| Error::UnexpectedError {
message: format!(
- "Missing offset, error message: {}",
- resp.error_message
- .as_deref()
- .unwrap_or("unknown server exception")
+ "Missing offset for bucket {} without error code.",
+ resp.bucket_id
),
source: None,
})
- } else {
- // if no error msg, offset must exists
- Ok((resp.bucket_id, resp.offset.unwrap()))
- }
})
.collect()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::proto::{ListOffsetsResponse, PbListOffsetsRespForBucket};
+
+ #[test]
+ fn offsets_returns_api_error_on_error_code() {
+ let response = ListOffsetsResponse {
+ buckets_resp: vec![PbListOffsetsRespForBucket {
+ bucket_id: 1,
+ error_code: Some(FlussError::TableNotExist.code()),
+ error_message: Some("missing".to_string()),
+ offset: None,
+ }],
+ };
+
+ let result = response.offsets();
+ assert!(matches!(result, Err(Error::FlussAPIError { .. })));
+ }
+}
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index d191615..30424e5 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -184,3 +184,57 @@ impl<S> Default for FairBucketStatusMap<S> {
Self::new()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::Arc;
+
+ #[test]
+ fn fair_bucket_status_map_tracks_order_and_size() {
+ let bucket0 = TableBucket::new(1, 0);
+ let bucket1 = TableBucket::new(1, 1);
+
+ let mut map = FairBucketStatusMap::new();
+ map.update_and_move_to_end(bucket0.clone(), 10);
+ map.update_and_move_to_end(bucket1.clone(), 20);
+ assert_eq!(map.size(), 2);
+
+ let values: Vec<i32> = map
+ .bucket_status_values()
+ .into_iter()
+ .map(|value| **value)
+ .collect();
+ assert_eq!(values, vec![10, 20]);
+
+ map.move_to_end(bucket0.clone());
+ let values: Vec<i32> = map
+ .bucket_status_values()
+ .into_iter()
+ .map(|value| **value)
+ .collect();
+ assert_eq!(values, vec![20, 10]);
+ }
+
+ #[test]
+ fn fair_bucket_status_map_mutations() {
+ let bucket0 = TableBucket::new(1, 0);
+ let bucket1 = TableBucket::new(2, 1);
+
+ let mut map = FairBucketStatusMap::new();
+ let mut input = HashMap::new();
+ input.insert(bucket0.clone(), Arc::new(1));
+ input.insert(bucket1.clone(), Arc::new(2));
+ map.set(input);
+
+ assert!(map.contains(&bucket0));
+ assert!(map.contains(&bucket1));
+ assert_eq!(map.bucket_set().len(), 2);
+
+ map.remove(&bucket1);
+ assert_eq!(map.size(), 1);
+
+ map.clear();
+ assert_eq!(map.size(), 0);
+ }
+}