This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5574d55 chore: Reject table scan for INDEXED format or KV table (#238)
5574d55 is described below
commit 5574d55de4a443ba1a1e3d17c3e550e86c6ca041
Author: Keith Lee <[email protected]>
AuthorDate: Wed Feb 4 12:45:08 2026 +0000
chore: Reject table scan for INDEXED format or KV table (#238)
---
bindings/cpp/src/lib.rs | 55 +++++++++---------
crates/fluss/src/client/table/scanner.rs | 97 +++++++++++++++++++++++++++++++-
crates/fluss/src/metadata/table.rs | 11 ++++
3 files changed, 132 insertions(+), 31 deletions(-)
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 7ae6416..ab02c8d 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -591,10 +591,10 @@ impl Table {
self.table_info.clone(),
);
- let scanner = match fluss_table.new_scan().create_log_scanner() {
- Ok(a) => a,
- Err(e) => return Err(format!("Failed to create log scanner:
{e}")),
- };
+ let scanner = fluss_table
+ .new_scan()
+ .create_log_scanner()
+ .map_err(|e| format!("Failed to create log scanner: {e}"))?;
let scanner_ptr = Box::into_raw(Box::new(LogScanner {
inner: Some(scanner),
@@ -616,17 +616,15 @@ impl Table {
self.table_info.clone(),
);
- let scan = fluss_table.new_scan();
- let scan = match scan.project(&column_indices) {
- Ok(s) => s,
- Err(e) => return Err(format!("Failed to project columns:
{e}")),
- };
- let scanner = match scan.create_log_scanner() {
- Ok(a) => a,
- Err(e) => return Err(format!("Failed to create log scanner:
{e}")),
- };
+ let log_scanner = fluss_table
+ .new_scan()
+ .project(&column_indices)
+ .map_err(|e| format!("Failed to project columns: {e}"))?
+ .create_log_scanner()
+ .map_err(|e| format!("Failed to create log scanner: {e}"))?;
+
let scanner = Box::into_raw(Box::new(LogScanner {
- inner: Some(scanner),
+ inner: Some(log_scanner),
inner_batch: None,
}));
Ok(scanner)
@@ -641,13 +639,14 @@ impl Table {
self.table_info.clone(),
);
- let scanner = match
fluss_table.new_scan().create_record_batch_log_scanner() {
- Ok(a) => a,
- Err(e) => return Err(format!("Failed to create record batch
log scanner: {e}")),
- };
+ let batch_scanner = fluss_table
+ .new_scan()
+ .create_record_batch_log_scanner()
+ .map_err(|e| format!("Failed to create record batch log
scanner: {e}"))?;
+
let scanner = Box::into_raw(Box::new(LogScanner {
inner: None,
- inner_batch: Some(scanner),
+ inner_batch: Some(batch_scanner),
}));
Ok(scanner)
})
@@ -664,18 +663,16 @@ impl Table {
self.table_info.clone(),
);
- let scan = fluss_table.new_scan();
- let scan = match scan.project(&column_indices) {
- Ok(s) => s,
- Err(e) => return Err(format!("Failed to project columns:
{e}")),
- };
- let scanner = match scan.create_record_batch_log_scanner() {
- Ok(a) => a,
- Err(e) => return Err(format!("Failed to create record batch
log scanner: {e}")),
- };
+ let batch_scanner = fluss_table
+ .new_scan()
+ .project(&column_indices)
+ .map_err(|e| format!("Failed to project columns: {e}"))?
+ .create_record_batch_log_scanner()
+ .map_err(|e| format!("Failed to create record batch log
scanner: {e}"))?;
+
let scanner = Box::into_raw(Box::new(LogScanner {
inner: None,
- inner_batch: Some(scanner),
+ inner_batch: Some(batch_scanner),
}));
Ok(scanner)
})
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index a88964e..422f9d3 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -34,8 +34,9 @@ use crate::client::table::log_fetch_buffer::{
LogFetchBuffer, RemotePendingFetch,
};
use crate::client::table::remote_log::{RemoteLogDownloader,
RemoteLogFetchInfo};
+use crate::error::Error::UnsupportedOperation;
use crate::error::{ApiError, Error, FlussError, Result};
-use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
+use crate::metadata::{LogFormat, PhysicalTablePath, TableBucket, TableInfo,
TablePath};
use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
use crate::record::{
LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords,
to_arrow_schema,
@@ -221,6 +222,7 @@ impl<'a> TableScan<'a> {
}
pub fn create_log_scanner(self) -> Result<LogScanner> {
+ validate_scan_support(&self.table_info.table_path, &self.table_info)?;
let inner = LogScannerInner::new(
&self.table_info,
self.metadata.clone(),
@@ -234,6 +236,7 @@ impl<'a> TableScan<'a> {
}
pub fn create_record_batch_log_scanner(self) ->
Result<RecordBatchLogScanner> {
+ validate_scan_support(&self.table_info.table_path, &self.table_info)?;
let inner = LogScannerInner::new(
&self.table_info,
self.metadata.clone(),
@@ -1556,6 +1559,25 @@ impl BucketScanStatus {
}
}
+fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) ->
Result<()> {
+ if table_info.schema.primary_key().is_some() {
+ return Err(UnsupportedOperation {
+ message: format!("Table {table_path} is not a Log Table and
doesn't support scan."),
+ });
+ }
+
+ let log_format = table_info.table_config.get_log_format()?;
+ if LogFormat::ARROW != log_format {
+ return Err(UnsupportedOperation {
+ message: format!(
+ "Scan is only supported for ARROW format and table
{table_path} uses {log_format} format"
+ ),
+ });
+ }
+
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -1564,7 +1586,7 @@ mod tests {
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
- use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
+ use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo,
TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
use crate::row::{Datum, GenericRow};
use crate::rpc::FlussError;
@@ -1781,4 +1803,75 @@ mod tests {
assert!(metadata.get_cluster().leader_for(&bucket).is_none());
Ok(())
}
+
+ fn create_test_table_info(
+ has_primary_key: bool,
+ log_format: Option<&str>,
+ ) -> (TableInfo, TablePath) {
+ let mut schema_builder = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string());
+
+ if has_primary_key {
+ schema_builder = schema_builder.primary_key(vec!["id"]);
+ }
+
+ let schema = schema_builder.build().unwrap();
+ let table_path = TablePath::new("test_db", "test_table");
+
+ let mut properties = HashMap::new();
+ if let Some(format) = log_format {
+ properties.insert("table.log.format".to_string(),
format.to_string());
+ }
+
+ let table_info = TableInfo::new(
+ table_path.clone(),
+ 1,
+ 1,
+ schema,
+ vec![],
+ Arc::from(vec![]),
+ 1,
+ properties,
+ HashMap::new(),
+ None,
+ 0,
+ 0,
+ );
+
+ (table_info, table_path)
+ }
+
+ #[test]
+ fn test_validate_scan_support() {
+ // Primary key table
+ let (table_info, table_path) = create_test_table_info(true,
Some("ARROW"));
+ let result = validate_scan_support(&table_path, &table_info);
+
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(matches!(err, UnsupportedOperation { .. }));
+ assert!(err.to_string().contains(
+ format!("Table {table_path} is not a Log Table and doesn't support
scan.").as_str()
+ ));
+
+ // Indexed format
+ let (table_info, table_path) = create_test_table_info(false,
Some("INDEXED"));
+ let result = validate_scan_support(&table_path, &table_info);
+
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(matches!(err, UnsupportedOperation { .. }));
+ assert!(err.to_string().contains(format!("Scan is only supported for
ARROW format and table {table_path} uses INDEXED format").as_str()));
+
+ // Default format
+ let (table_info, table_path) = create_test_table_info(false, None);
+ let result = validate_scan_support(&table_path, &table_info);
+ assert!(result.is_ok());
+
+ // Arrow format
+ let (table_info, table_path) = create_test_table_info(false,
Some("ARROW"));
+ let result = validate_scan_support(&table_path, &table_info);
+ assert!(result.is_ok());
+ }
}
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 4e0a525..66de2a9 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -969,6 +969,17 @@ impl TableConfig {
kv_format.parse().map_err(Into::into)
}
+ pub fn get_log_format(&self) -> Result<LogFormat> {
+ // TODO: Consolidate configurations logic, constants, defaults in a
single place
+ const DEFAULT_LOG_FORMAT: &str = "ARROW";
+ let log_format = self
+ .properties
+ .get("table.log.format")
+ .map(String::as_str)
+ .unwrap_or(DEFAULT_LOG_FORMAT);
+ LogFormat::parse(log_format)
+ }
+
pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy {
AutoPartitionStrategy::from(&self.properties)
}