This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5574d55  chore: Reject table scan for INDEXED format or KV table (#238)
5574d55 is described below

commit 5574d55de4a443ba1a1e3d17c3e550e86c6ca041
Author: Keith Lee <[email protected]>
AuthorDate: Wed Feb 4 12:45:08 2026 +0000

    chore: Reject table scan for INDEXED format or KV table (#238)
---
 bindings/cpp/src/lib.rs                  | 55 +++++++++---------
 crates/fluss/src/client/table/scanner.rs | 97 +++++++++++++++++++++++++++++++-
 crates/fluss/src/metadata/table.rs       | 11 ++++
 3 files changed, 132 insertions(+), 31 deletions(-)

diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 7ae6416..ab02c8d 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -591,10 +591,10 @@ impl Table {
                 self.table_info.clone(),
             );
 
-            let scanner = match fluss_table.new_scan().create_log_scanner() {
-                Ok(a) => a,
-                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
-            };
+            let scanner = fluss_table
+                .new_scan()
+                .create_log_scanner()
+                .map_err(|e| format!("Failed to create log scanner: {e}"))?;
 
             let scanner_ptr = Box::into_raw(Box::new(LogScanner {
                 inner: Some(scanner),
@@ -616,17 +616,15 @@ impl Table {
                 self.table_info.clone(),
             );
 
-            let scan = fluss_table.new_scan();
-            let scan = match scan.project(&column_indices) {
-                Ok(s) => s,
-                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
-            };
-            let scanner = match scan.create_log_scanner() {
-                Ok(a) => a,
-                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
-            };
+            let log_scanner = fluss_table
+                .new_scan()
+                .project(&column_indices)
+                .map_err(|e| format!("Failed to project columns: {e}"))?
+                .create_log_scanner()
+                .map_err(|e| format!("Failed to create log scanner: {e}"))?;
+
             let scanner = Box::into_raw(Box::new(LogScanner {
-                inner: Some(scanner),
+                inner: Some(log_scanner),
                 inner_batch: None,
             }));
             Ok(scanner)
@@ -641,13 +639,14 @@ impl Table {
                 self.table_info.clone(),
             );
 
-            let scanner = match 
fluss_table.new_scan().create_record_batch_log_scanner() {
-                Ok(a) => a,
-                Err(e) => return Err(format!("Failed to create record batch 
log scanner: {e}")),
-            };
+            let batch_scanner = fluss_table
+                .new_scan()
+                .create_record_batch_log_scanner()
+                .map_err(|e| format!("Failed to create record batch log 
scanner: {e}"))?;
+
             let scanner = Box::into_raw(Box::new(LogScanner {
                 inner: None,
-                inner_batch: Some(scanner),
+                inner_batch: Some(batch_scanner),
             }));
             Ok(scanner)
         })
@@ -664,18 +663,16 @@ impl Table {
                 self.table_info.clone(),
             );
 
-            let scan = fluss_table.new_scan();
-            let scan = match scan.project(&column_indices) {
-                Ok(s) => s,
-                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
-            };
-            let scanner = match scan.create_record_batch_log_scanner() {
-                Ok(a) => a,
-                Err(e) => return Err(format!("Failed to create record batch 
log scanner: {e}")),
-            };
+            let batch_scanner = fluss_table
+                .new_scan()
+                .project(&column_indices)
+                .map_err(|e| format!("Failed to project columns: {e}"))?
+                .create_record_batch_log_scanner()
+                .map_err(|e| format!("Failed to create record batch log 
scanner: {e}"))?;
+
             let scanner = Box::into_raw(Box::new(LogScanner {
                 inner: None,
-                inner_batch: Some(scanner),
+                inner_batch: Some(batch_scanner),
             }));
             Ok(scanner)
         })
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index a88964e..422f9d3 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -34,8 +34,9 @@ use crate::client::table::log_fetch_buffer::{
     LogFetchBuffer, RemotePendingFetch,
 };
 use crate::client::table::remote_log::{RemoteLogDownloader, 
RemoteLogFetchInfo};
+use crate::error::Error::UnsupportedOperation;
 use crate::error::{ApiError, Error, FlussError, Result};
-use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
+use crate::metadata::{LogFormat, PhysicalTablePath, TableBucket, TableInfo, 
TablePath};
 use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
 use crate::record::{
     LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords, 
to_arrow_schema,
@@ -221,6 +222,7 @@ impl<'a> TableScan<'a> {
     }
 
     pub fn create_log_scanner(self) -> Result<LogScanner> {
+        validate_scan_support(&self.table_info.table_path, &self.table_info)?;
         let inner = LogScannerInner::new(
             &self.table_info,
             self.metadata.clone(),
@@ -234,6 +236,7 @@ impl<'a> TableScan<'a> {
     }
 
     pub fn create_record_batch_log_scanner(self) -> 
Result<RecordBatchLogScanner> {
+        validate_scan_support(&self.table_info.table_path, &self.table_info)?;
         let inner = LogScannerInner::new(
             &self.table_info,
             self.metadata.clone(),
@@ -1556,6 +1559,25 @@ impl BucketScanStatus {
     }
 }
 
+fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) -> 
Result<()> {
+    if table_info.schema.primary_key().is_some() {
+        return Err(UnsupportedOperation {
+            message: format!("Table {table_path} is not a Log Table and 
doesn't support scan."),
+        });
+    }
+
+    let log_format = table_info.table_config.get_log_format()?;
+    if LogFormat::ARROW != log_format {
+        return Err(UnsupportedOperation {
+            message: format!(
+                "Scan is only supported for ARROW format and table 
{table_path} uses {log_format} format"
+            ),
+        });
+    }
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -1564,7 +1586,7 @@ mod tests {
     use crate::compression::{
         ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
     };
-    use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
+    use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo, 
TablePath};
     use crate::record::MemoryLogRecordsArrowBuilder;
     use crate::row::{Datum, GenericRow};
     use crate::rpc::FlussError;
@@ -1781,4 +1803,75 @@ mod tests {
         assert!(metadata.get_cluster().leader_for(&bucket).is_none());
         Ok(())
     }
+
+    fn create_test_table_info(
+        has_primary_key: bool,
+        log_format: Option<&str>,
+    ) -> (TableInfo, TablePath) {
+        let mut schema_builder = Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string());
+
+        if has_primary_key {
+            schema_builder = schema_builder.primary_key(vec!["id"]);
+        }
+
+        let schema = schema_builder.build().unwrap();
+        let table_path = TablePath::new("test_db", "test_table");
+
+        let mut properties = HashMap::new();
+        if let Some(format) = log_format {
+            properties.insert("table.log.format".to_string(), 
format.to_string());
+        }
+
+        let table_info = TableInfo::new(
+            table_path.clone(),
+            1,
+            1,
+            schema,
+            vec![],
+            Arc::from(vec![]),
+            1,
+            properties,
+            HashMap::new(),
+            None,
+            0,
+            0,
+        );
+
+        (table_info, table_path)
+    }
+
+    #[test]
+    fn test_validate_scan_support() {
+        // Primary key table
+        let (table_info, table_path) = create_test_table_info(true, 
Some("ARROW"));
+        let result = validate_scan_support(&table_path, &table_info);
+
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(matches!(err, UnsupportedOperation { .. }));
+        assert!(err.to_string().contains(
+            format!("Table {table_path} is not a Log Table and doesn't support 
scan.").as_str()
+        ));
+
+        // Indexed format
+        let (table_info, table_path) = create_test_table_info(false, 
Some("INDEXED"));
+        let result = validate_scan_support(&table_path, &table_info);
+
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(matches!(err, UnsupportedOperation { .. }));
+        assert!(err.to_string().contains(format!("Scan is only supported for 
ARROW format and table {table_path} uses INDEXED format").as_str()));
+
+        // Default format
+        let (table_info, table_path) = create_test_table_info(false, None);
+        let result = validate_scan_support(&table_path, &table_info);
+        assert!(result.is_ok());
+
+        // Arrow format
+        let (table_info, table_path) = create_test_table_info(false, 
Some("ARROW"));
+        let result = validate_scan_support(&table_path, &table_info);
+        assert!(result.is_ok());
+    }
 }
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 4e0a525..66de2a9 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -969,6 +969,17 @@ impl TableConfig {
         kv_format.parse().map_err(Into::into)
     }
 
+    pub fn get_log_format(&self) -> Result<LogFormat> {
+        // TODO: Consolidate configurations logic, constants, defaults in a 
single place
+        const DEFAULT_LOG_FORMAT: &str = "ARROW";
+        let log_format = self
+            .properties
+            .get("table.log.format")
+            .map(String::as_str)
+            .unwrap_or(DEFAULT_LOG_FORMAT);
+        LogFormat::parse(log_format)
+    }
+
     pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy {
         AutoPartitionStrategy::from(&self.properties)
     }

Reply via email to