This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 20ce7a50c feat(datafusion): Add Binary scalar value conversion for
predicate pushdown (#2048)
20ce7a50c is described below
commit 20ce7a50c67344564be48d5f1f8fafca2c288854
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Jan 21 09:01:26 2026 +0800
feat(datafusion): Add Binary scalar value conversion for predicate pushdown
(#2048)
## Which issue does this PR close?
- Closes #.
## What changes are included in this PR?
Add support for converting Binary and LargeBinary DataFusion ScalarValue
types to Iceberg Datum, enabling binary predicates to be pushed down to
the Iceberg storage layer.
This conversion allows SQL queries with binary hex literals (X'...') to
push predicates down to Iceberg, improving query performance by
filtering data at the storage level rather than in DataFusion.
The integration test verifies that binary predicates are successfully
pushed down end-to-end:
- Without conversion: predicate stays in FilterExec with predicate:[]
- With conversion: predicate pushed to IcebergTableScan
Other scalar types (Boolean, Timestamp, Decimal) were investigated but
excluded because they are not reachable through practical usage:
- Boolean: DataFusion aggressively optimizes comparisons (e.g., x=true
becomes just x) before reaching the converter
- Timestamp/Decimal: SQL literals are converted to strings/other types
before reaching the converter
## Are these changes tested?
---------
Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
.../src/physical_plan/expr_to_predicate.rs | 29 +++++++++
crates/sqllogictest/src/engine/datafusion.rs | 28 +++++++++
.../sqllogictest/testdata/schedules/df_test.toml | 4 ++
.../slts/df_test/binary_predicate_pushdown.slt | 69 ++++++++++++++++++++++
.../testdata/slts/df_test/show_tables.slt | 3 +
5 files changed, 133 insertions(+)
diff --git
a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
index 2974dd264..fc95be5db 100644
--- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
+++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
@@ -212,6 +212,8 @@ fn scalar_value_to_datum(value: &ScalarValue) ->
Option<Datum> {
ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
+ ScalarValue::Binary(Some(v)) => Some(Datum::binary(v.clone())),
+ ScalarValue::LargeBinary(Some(v)) => Some(Datum::binary(v.clone())),
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY)
as i32)),
_ => None,
@@ -429,4 +431,31 @@ mod tests {
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}
+
+ #[test]
+ fn test_scalar_value_to_datum_binary() {
+ use datafusion::common::ScalarValue;
+
+ let bytes = vec![1u8, 2u8, 3u8];
+ let datum =
super::scalar_value_to_datum(&ScalarValue::Binary(Some(bytes.clone())));
+ assert_eq!(datum, Some(Datum::binary(bytes.clone())));
+
+ let datum =
super::scalar_value_to_datum(&ScalarValue::LargeBinary(Some(bytes.clone())));
+ assert_eq!(datum, Some(Datum::binary(bytes)));
+
+ let datum = super::scalar_value_to_datum(&ScalarValue::Binary(None));
+ assert_eq!(datum, None);
+ }
+
+ #[test]
+ fn test_predicate_conversion_with_binary() {
+ let sql = "foo = 1 and bar = X'0102'";
+ let predicate = convert_to_iceberg_predicate(sql).unwrap();
+ // Binary literals are converted to Datum::binary
+ // Note: SQL literal 1 is converted to Long by DataFusion
+ let expected_predicate = Reference::new("foo")
+ .equal_to(Datum::long(1))
+ .and(Reference::new("bar").equal_to(Datum::binary(vec![1u8,
2u8])));
+ assert_eq!(predicate, expected_predicate);
+ }
}
diff --git a/crates/sqllogictest/src/engine/datafusion.rs
b/crates/sqllogictest/src/engine/datafusion.rs
index 487d8dc97..7af647be9 100644
--- a/crates/sqllogictest/src/engine/datafusion.rs
+++ b/crates/sqllogictest/src/engine/datafusion.rs
@@ -95,6 +95,7 @@ impl DataFusionEngine {
// Create partitioned test table (unpartitioned tables are now created
via SQL)
Self::create_partitioned_table(&catalog, &namespace).await?;
+ Self::create_binary_table(&catalog, &namespace).await?;
Ok(Arc::new(
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
@@ -134,4 +135,31 @@ impl DataFusionEngine {
Ok(())
}
+
+ /// Create a test table with binary type column
+ /// Used for testing binary predicate pushdown
+ /// TODO: this can be removed when we support CREATE TABLE
+ async fn create_binary_table(
+ catalog: &impl Catalog,
+ namespace: &NamespaceIdent,
+ ) -> anyhow::Result<()> {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::Binary)).into(),
+ ])
+ .build()?;
+
+ catalog
+ .create_table(
+ namespace,
+ TableCreation::builder()
+ .name("test_binary_table".to_string())
+ .schema(schema)
+ .build(),
+ )
+ .await?;
+
+ Ok(())
+ }
}
diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml
b/crates/sqllogictest/testdata/schedules/df_test.toml
index 1d7f42c8d..6fdd08054 100644
--- a/crates/sqllogictest/testdata/schedules/df_test.toml
+++ b/crates/sqllogictest/testdata/schedules/df_test.toml
@@ -29,3 +29,7 @@ slt = "df_test/create_table.slt"
[[steps]]
engine = "df"
slt = "df_test/insert_into.slt"
+
+[[steps]]
+engine = "df"
+slt = "df_test/binary_predicate_pushdown.slt"
diff --git
a/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt
b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt
new file mode 100644
index 000000000..d3d1db1b6
--- /dev/null
+++ b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test that binary predicates are pushed down to IcebergTableScan
+# This validates that ScalarValue::Binary is correctly converted to
Datum::binary
+
+# Verify EXPLAIN shows binary predicate is pushed down to IcebergTableScan
+query TT
+EXPLAIN SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+logical_plan
+01)Filter: default.default.test_binary_table.data = LargeBinary("1,2")
+02)--TableScan: default.default.test_binary_table projection=[id, data],
partial_filters=[default.default.test_binary_table.data = LargeBinary("1,2")]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--FilterExec: data@1 = 0102
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------CooperativeExec
+05)--------IcebergTableScan projection:[id,data] predicate:[data = 0102]
+
+# Verify empty result from empty table
+query I?
+SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+
+# Insert test data
+query I
+INSERT INTO default.default.test_binary_table VALUES (1, X'0102')
+----
+1
+
+# Verify binary predicate filtering works
+query I?
+SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+1 0102
+
+# Insert more rows with different binary values
+query I
+INSERT INTO default.default.test_binary_table VALUES (2, X'0304'), (3,
X'0102'), (4, X'0506')
+----
+3
+
+# Verify binary predicate filters correctly
+query I? rowsort
+SELECT * FROM default.default.test_binary_table WHERE data = X'0102'
+----
+1 0102
+3 0102
+
+# Verify different binary value
+query I?
+SELECT * FROM default.default.test_binary_table WHERE data = X'0506'
+----
+4 0506
diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
index 770072f9d..a0f0e55b5 100644
--- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
+++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
@@ -25,6 +25,9 @@ datafusion information_schema routines VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
+default default test_binary_table BASE TABLE
+default default test_binary_table$manifests BASE TABLE
+default default test_binary_table$snapshots BASE TABLE
default default test_partitioned_table BASE TABLE
default default test_partitioned_table$manifests BASE TABLE
default default test_partitioned_table$snapshots BASE TABLE