This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 20ce7a50c feat(datafusion): Add Binary scalar value conversion for 
predicate pushdown (#2048)
20ce7a50c is described below

commit 20ce7a50c67344564be48d5f1f8fafca2c288854
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Jan 21 09:01:26 2026 +0800

    feat(datafusion): Add Binary scalar value conversion for predicate pushdown 
(#2048)
    
    ## Which issue does this PR close?
    
    - Closes #.
    
    ## What changes are included in this PR?
    
    Add support for converting Binary and LargeBinary DataFusion ScalarValue
    types to Iceberg Datum, enabling binary predicates to be pushed down to
    the Iceberg storage layer.
    
    This conversion allows SQL queries with binary hex literals (X'...') to
    push predicates down to Iceberg, improving query performance by
    filtering data at the storage level rather than in DataFusion.
    
    The integration test verifies that binary predicates are successfully
    pushed down end-to-end:
    - Without conversion: predicate stays in FilterExec with predicate:[]
    - With conversion: predicate pushed to IcebergTableScan
    
    Other scalar types (Boolean, Timestamp, Decimal) were investigated but
    excluded because they are not reachable through practical usage:
    - Boolean: DataFusion aggressively optimizes comparisons (e.g., x=true
    becomes just x) before reaching the converter
    - Timestamp/Decimal: SQL literals are converted to strings/other types
    before reaching the converter
    
    ## Are these changes tested?
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
 .../src/physical_plan/expr_to_predicate.rs         | 29 +++++++++
 crates/sqllogictest/src/engine/datafusion.rs       | 28 +++++++++
 .../sqllogictest/testdata/schedules/df_test.toml   |  4 ++
 .../slts/df_test/binary_predicate_pushdown.slt     | 69 ++++++++++++++++++++++
 .../testdata/slts/df_test/show_tables.slt          |  3 +
 5 files changed, 133 insertions(+)

diff --git 
a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs 
b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
index 2974dd264..fc95be5db 100644
--- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
+++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
@@ -212,6 +212,8 @@ fn scalar_value_to_datum(value: &ScalarValue) -> 
Option<Datum> {
         ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
         ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
         ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
+        ScalarValue::Binary(Some(v)) => Some(Datum::binary(v.clone())),
+        ScalarValue::LargeBinary(Some(v)) => Some(Datum::binary(v.clone())),
         ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
         ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) 
as i32)),
         _ => None,
@@ -429,4 +431,31 @@ mod tests {
         let predicate = convert_to_iceberg_predicate(sql);
         assert_eq!(predicate, None);
     }
+
+    #[test]
+    fn test_scalar_value_to_datum_binary() {
+        use datafusion::common::ScalarValue;
+
+        let bytes = vec![1u8, 2u8, 3u8];
+        let datum = 
super::scalar_value_to_datum(&ScalarValue::Binary(Some(bytes.clone())));
+        assert_eq!(datum, Some(Datum::binary(bytes.clone())));
+
+        let datum = 
super::scalar_value_to_datum(&ScalarValue::LargeBinary(Some(bytes.clone())));
+        assert_eq!(datum, Some(Datum::binary(bytes)));
+
+        let datum = super::scalar_value_to_datum(&ScalarValue::Binary(None));
+        assert_eq!(datum, None);
+    }
+
+    #[test]
+    fn test_predicate_conversion_with_binary() {
+        let sql = "foo = 1 and bar = X'0102'";
+        let predicate = convert_to_iceberg_predicate(sql).unwrap();
+        // Binary literals are converted to Datum::binary
+        // Note: SQL literal 1 is converted to Long by DataFusion
+        let expected_predicate = Reference::new("foo")
+            .equal_to(Datum::long(1))
+            .and(Reference::new("bar").equal_to(Datum::binary(vec![1u8, 
2u8])));
+        assert_eq!(predicate, expected_predicate);
+    }
 }
diff --git a/crates/sqllogictest/src/engine/datafusion.rs 
b/crates/sqllogictest/src/engine/datafusion.rs
index 487d8dc97..7af647be9 100644
--- a/crates/sqllogictest/src/engine/datafusion.rs
+++ b/crates/sqllogictest/src/engine/datafusion.rs
@@ -95,6 +95,7 @@ impl DataFusionEngine {
 
         // Create partitioned test table (unpartitioned tables are now created 
via SQL)
         Self::create_partitioned_table(&catalog, &namespace).await?;
+        Self::create_binary_table(&catalog, &namespace).await?;
 
         Ok(Arc::new(
             IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
@@ -134,4 +135,31 @@ impl DataFusionEngine {
 
         Ok(())
     }
+
+    /// Create a test table with binary type column
+    /// Used for testing binary predicate pushdown
+    /// TODO: this can be removed when we support CREATE TABLE
+    async fn create_binary_table(
+        catalog: &impl Catalog,
+        namespace: &NamespaceIdent,
+    ) -> anyhow::Result<()> {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::optional(2, "data", 
Type::Primitive(PrimitiveType::Binary)).into(),
+            ])
+            .build()?;
+
+        catalog
+            .create_table(
+                namespace,
+                TableCreation::builder()
+                    .name("test_binary_table".to_string())
+                    .schema(schema)
+                    .build(),
+            )
+            .await?;
+
+        Ok(())
+    }
 }
diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml 
b/crates/sqllogictest/testdata/schedules/df_test.toml
index 1d7f42c8d..6fdd08054 100644
--- a/crates/sqllogictest/testdata/schedules/df_test.toml
+++ b/crates/sqllogictest/testdata/schedules/df_test.toml
@@ -29,3 +29,7 @@ slt = "df_test/create_table.slt"
 [[steps]]
 engine = "df"
 slt = "df_test/insert_into.slt"
+
+[[steps]]
+engine = "df"
+slt = "df_test/binary_predicate_pushdown.slt"
diff --git 
a/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt 
b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt
new file mode 100644
index 000000000..d3d1db1b6
--- /dev/null
+++ b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test that binary predicates are pushed down to IcebergTableScan
+# This validates that ScalarValue::Binary is correctly converted to 
Datum::binary
+
+# Verify EXPLAIN shows binary predicate is pushed down to IcebergTableScan
+query TT
+EXPLAIN SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+logical_plan
+01)Filter: default.default.test_binary_table.data = LargeBinary("1,2")
+02)--TableScan: default.default.test_binary_table projection=[id, data], 
partial_filters=[default.default.test_binary_table.data = LargeBinary("1,2")]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--FilterExec: data@1 = 0102
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------CooperativeExec
+05)--------IcebergTableScan projection:[id,data] predicate:[data = 0102]
+
+# Verify empty result from empty table
+query I?
+SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+
+# Insert test data
+query I
+INSERT INTO default.default.test_binary_table VALUES (1, X'0102')
+----
+1
+
+# Verify binary predicate filtering works
+query I?
+SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+1 0102
+
+# Insert more rows with different binary values
+query I
+INSERT INTO default.default.test_binary_table VALUES (2, X'0304'), (3, 
X'0102'), (4, X'0506')
+----
+3
+
+# Verify binary predicate filters correctly
+query I? rowsort
+SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+1 0102
+3 0102
+
+# Verify different binary value
+query I?
+SELECT * FROM default.default.test_binary_table WHERE data = X'0506'
+----
+4 0506
diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt 
b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
index 770072f9d..a0f0e55b5 100644
--- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
+++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
@@ -25,6 +25,9 @@ datafusion information_schema routines VIEW
 datafusion information_schema schemata VIEW
 datafusion information_schema tables VIEW
 datafusion information_schema views VIEW
+default default test_binary_table BASE TABLE
+default default test_binary_table$manifests BASE TABLE
+default default test_binary_table$snapshots BASE TABLE
 default default test_partitioned_table BASE TABLE
 default default test_partitioned_table$manifests BASE TABLE
 default default test_partitioned_table$snapshots BASE TABLE

Reply via email to