This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e1cb194  feat:  support partitioned table in TableScan and TableRead 
(#145)
e1cb194 is described below

commit e1cb194fe72ac4918ede62f42dad23346b02aee4
Author: Zach <[email protected]>
AuthorDate: Wed Mar 25 08:08:05 2026 +0800

    feat:  support partitioned table in TableScan and TableRead (#145)
---
 crates/integration_tests/tests/read_tables.rs | 248 ++++++++++++++++++++++----
 crates/paimon/src/spec/core_options.rs        |  51 +++++-
 crates/paimon/src/spec/data_file.rs           |  49 ++++-
 crates/paimon/src/spec/mod.rs                 |   1 +
 crates/paimon/src/spec/partition_utils.rs     |  81 ++++-----
 crates/paimon/src/table/read_builder.rs       |   8 -
 crates/paimon/src/table/table_scan.rs         |  65 +++----
 dev/spark/provision.py                        |  86 +++++++++
 8 files changed, 466 insertions(+), 123 deletions(-)

diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 69a05f7..f7f7b57 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -9,28 +9,27 @@
 //   http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on
-// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
 
 //! Integration tests for reading Paimon tables provisioned by Spark.
 
-use arrow_array::{Int32Array, StringArray};
+use arrow_array::{Int32Array, RecordBatch, StringArray};
 use futures::TryStreamExt;
 use paimon::catalog::Identifier;
-use paimon::{Catalog, FileSystemCatalog};
+use paimon::{Catalog, FileSystemCatalog, Plan};
+use std::collections::HashSet;
 
-/// Get the test warehouse path from environment variable or use default.
 fn get_test_warehouse() -> String {
     std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| 
"/tmp/paimon-warehouse".to_string())
 }
 
-async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
+async fn scan_and_read(table_name: &str) -> (Plan, Vec<RecordBatch>) {
     let warehouse = get_test_warehouse();
     let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create 
catalog");
-
     let identifier = Identifier::new("default", table_name);
     let table = catalog
         .get_table(&identifier)
@@ -38,14 +37,13 @@ async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
         .expect("Failed to get table");
 
     let read_builder = table.new_read_builder();
-    let read = read_builder.new_read().expect("Failed to create read");
     let scan = read_builder.new_scan();
     let plan = scan.plan().await.expect("Failed to plan scan");
 
+    let read = read_builder.new_read().expect("Failed to create read");
     let stream = read
         .to_arrow(plan.splits())
         .expect("Failed to create arrow stream");
-
     let batches: Vec<_> = stream
         .try_collect()
         .await
@@ -55,47 +53,57 @@ async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
         !batches.is_empty(),
         "Expected at least one batch from table {table_name}"
     );
+    (plan, batches)
+}
 
-    let mut actual_rows: Vec<(i32, String)> = Vec::new();
-
-    for batch in &batches {
-        let id_array = batch
+fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
+    let mut rows = Vec::new();
+    for batch in batches {
+        let id = batch
             .column_by_name("id")
             .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
-            .expect("Expected Int32Array for id column");
-        let name_array = batch
+            .expect("Expected Int32Array for id");
+        let name = batch
             .column_by_name("name")
             .and_then(|c| c.as_any().downcast_ref::<StringArray>())
-            .expect("Expected StringArray for name column");
-
+            .expect("Expected StringArray for name");
         for i in 0..batch.num_rows() {
-            actual_rows.push((id_array.value(i), 
name_array.value(i).to_string()));
+            rows.push((id.value(i), name.value(i).to_string()));
         }
     }
-
-    actual_rows.sort_by_key(|(id, _)| *id);
-    actual_rows
+    rows.sort_by_key(|(id, _)| *id);
+    rows
 }
 
 #[tokio::test]
 async fn test_read_log_table() {
-    let actual_rows = read_rows("simple_log_table").await;
-    let expected_rows = vec![
+    let (plan, batches) = scan_and_read("simple_log_table").await;
+
+    // Non-partitioned table: partition should be a valid arity=0 BinaryRow
+    // deserialized from manifest bytes, not a stub without backing data.
+    for split in plan.splits() {
+        let partition = split.partition();
+        assert_eq!(partition.arity(), 0);
+        assert!(
+            !partition.is_empty(),
+            "Non-partitioned split should have backing data from manifest 
deserialization"
+        );
+    }
+
+    let actual = extract_id_name(&batches);
+    let expected = vec![
         (1, "alice".to_string()),
         (2, "bob".to_string()),
         (3, "carol".to_string()),
     ];
-
-    assert_eq!(
-        actual_rows, expected_rows,
-        "Rows should match expected values"
-    );
+    assert_eq!(actual, expected, "Rows should match expected values");
 }
 
 #[tokio::test]
 async fn test_read_dv_primary_key_table() {
-    let actual_rows = read_rows("simple_dv_pk_table").await;
-    let expected_rows = vec![
+    let (_, batches) = scan_and_read("simple_dv_pk_table").await;
+    let actual = extract_id_name(&batches);
+    let expected = vec![
         (1, "alice-v2".to_string()),
         (2, "bob-v2".to_string()),
         (3, "carol-v2".to_string()),
@@ -103,9 +111,185 @@ async fn test_read_dv_primary_key_table() {
         (5, "eve-v2".to_string()),
         (6, "frank-v1".to_string()),
     ];
-
     assert_eq!(
-        actual_rows, expected_rows,
+        actual, expected,
         "DV-enabled PK table should only expose the latest row per key"
     );
 }
+
+#[tokio::test]
+async fn test_read_partitioned_log_table() {
+    let (plan, batches) = scan_and_read("partitioned_log_table").await;
+
+    let mut seen_partitions: HashSet<String> = HashSet::new();
+    for split in plan.splits() {
+        let partition = split.partition();
+        assert_eq!(partition.arity(), 1);
+        assert!(!partition.is_empty());
+        let dt = partition.get_string(0).expect("Failed to decode dt");
+        let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket());
+        assert!(
+            split.bucket_path().ends_with(&expected_suffix),
+            "bucket_path should end with '{expected_suffix}', got: {}",
+            split.bucket_path()
+        );
+        seen_partitions.insert(dt.to_string());
+    }
+    assert_eq!(
+        seen_partitions,
+        HashSet::from(["2024-01-01".into(), "2024-01-02".into()])
+    );
+
+    let mut rows: Vec<(i32, String, String)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let dt = batch
+            .column_by_name("dt")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("dt");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
+        }
+    }
+    rows.sort_by_key(|(id, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".into(), "2024-01-01".into()),
+            (2, "bob".into(), "2024-01-01".into()),
+            (3, "carol".into(), "2024-01-02".into()),
+        ]
+    );
+}
+
+#[tokio::test]
+async fn test_read_multi_partitioned_log_table() {
+    let (plan, batches) = scan_and_read("multi_partitioned_log_table").await;
+
+    let mut seen_partitions: HashSet<(String, i32)> = HashSet::new();
+    for split in plan.splits() {
+        let partition = split.partition();
+        assert_eq!(partition.arity(), 2);
+        assert!(!partition.is_empty());
+        let dt = partition.get_string(0).expect("Failed to decode dt");
+        let hr = partition.get_int(1).expect("Failed to decode hr");
+        let expected_suffix = format!("dt={dt}/hr={hr}/bucket-{}", 
split.bucket());
+        assert!(
+            split.bucket_path().ends_with(&expected_suffix),
+            "bucket_path should end with '{expected_suffix}', got: {}",
+            split.bucket_path()
+        );
+        seen_partitions.insert((dt.to_string(), hr));
+    }
+    assert_eq!(
+        seen_partitions,
+        HashSet::from([
+            ("2024-01-01".into(), 10),
+            ("2024-01-01".into(), 20),
+            ("2024-01-02".into(), 10),
+        ])
+    );
+
+    let mut rows: Vec<(i32, String, String, i32)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let dt = batch
+            .column_by_name("dt")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("dt");
+        let hr = batch
+            .column_by_name("hr")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("hr");
+        for i in 0..batch.num_rows() {
+            rows.push((
+                id.value(i),
+                name.value(i).into(),
+                dt.value(i).into(),
+                hr.value(i),
+            ));
+        }
+    }
+    rows.sort_by_key(|(id, _, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".into(), "2024-01-01".into(), 10),
+            (2, "bob".into(), "2024-01-01".into(), 10),
+            (3, "carol".into(), "2024-01-01".into(), 20),
+            (4, "dave".into(), "2024-01-02".into(), 10),
+        ]
+    );
+}
+
+#[tokio::test]
+async fn test_read_partitioned_dv_pk_table() {
+    let (plan, batches) = scan_and_read("partitioned_dv_pk_table").await;
+
+    // Verify partition metadata on each split.
+    let mut seen_partitions: HashSet<String> = HashSet::new();
+    for split in plan.splits() {
+        let partition = split.partition();
+        assert_eq!(partition.arity(), 1);
+        assert!(!partition.is_empty());
+        let dt = partition.get_string(0).expect("Failed to decode dt");
+        let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket());
+        assert!(
+            split.bucket_path().ends_with(&expected_suffix),
+            "bucket_path should end with '{expected_suffix}', got: {}",
+            split.bucket_path()
+        );
+        seen_partitions.insert(dt.to_string());
+    }
+    assert_eq!(
+        seen_partitions,
+        HashSet::from(["2024-01-01".into(), "2024-01-02".into()])
+    );
+
+    let mut rows: Vec<(i32, String, String)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let dt = batch
+            .column_by_name("dt")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("dt");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
+        }
+    }
+    rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice-v2".into(), "2024-01-01".into()),
+            (1, "alice-v1".into(), "2024-01-02".into()),
+            (2, "bob-v2".into(), "2024-01-01".into()),
+            (3, "carol-v2".into(), "2024-01-02".into()),
+            (4, "dave-v2".into(), "2024-01-02".into()),
+        ]
+    );
+}
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index 484ab57..f6164d0 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -20,8 +20,11 @@ use std::collections::HashMap;
 const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
 const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
 const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
+const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
+const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
 const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
 const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
+const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
 
 /// Typed accessors for common table options.
 ///
@@ -39,7 +42,7 @@ impl<'a> CoreOptions<'a> {
     pub fn deletion_vectors_enabled(&self) -> bool {
         self.options
             .get(DELETION_VECTORS_ENABLED_OPTION)
-            .map(|value| matches!(value.to_ascii_lowercase().as_str(), "true"))
+            .map(|value| value.eq_ignore_ascii_case("true"))
             .unwrap_or(false)
     }
 
@@ -56,6 +59,27 @@ impl<'a> CoreOptions<'a> {
             .and_then(|value| parse_memory_size(value))
             .unwrap_or(DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST)
     }
+
+    /// The default partition name for null/blank partition values.
+    ///
+    /// Corresponds to Java `CoreOptions.PARTITION_DEFAULT_NAME`.
+    pub fn partition_default_name(&self) -> &str {
+        self.options
+            .get(PARTITION_DEFAULT_NAME_OPTION)
+            .map(String::as_str)
+            .unwrap_or(DEFAULT_PARTITION_DEFAULT_NAME)
+    }
+
+    /// Whether to use legacy partition name formatting (toString semantics).
+    ///
+    /// Corresponds to Java `CoreOptions.PARTITION_GENERATE_LEGACY_NAME`.
+    /// Default: `true` to match Java Paimon.
+    pub fn legacy_partition_name(&self) -> bool {
+        self.options
+            .get(PARTITION_LEGACY_NAME_OPTION)
+            .map(|v| v.eq_ignore_ascii_case("true"))
+            .unwrap_or(true)
+    }
 }
 
 /// Parse a memory size string to bytes using binary (1024-based) semantics.
@@ -131,4 +155,29 @@ mod tests {
         assert_eq!(parse_memory_size(""), None);
         assert_eq!(parse_memory_size("abc"), None);
     }
+
+    #[test]
+    fn test_partition_options_defaults() {
+        let options = HashMap::new();
+        let core = CoreOptions::new(&options);
+        assert_eq!(core.partition_default_name(), "__DEFAULT_PARTITION__");
+        assert!(core.legacy_partition_name());
+    }
+
+    #[test]
+    fn test_partition_options_custom() {
+        let options = HashMap::from([
+            (
+                PARTITION_DEFAULT_NAME_OPTION.to_string(),
+                "NULL_PART".to_string(),
+            ),
+            (
+                PARTITION_LEGACY_NAME_OPTION.to_string(),
+                "false".to_string(),
+            ),
+        ]);
+        let core = CoreOptions::new(&options);
+        assert_eq!(core.partition_default_name(), "NULL_PART");
+        assert!(!core.legacy_partition_name());
+    }
 }
diff --git a/crates/paimon/src/spec/data_file.rs 
b/crates/paimon/src/spec/data_file.rs
index 9934e63..8733b1d 100644
--- a/crates/paimon/src/spec/data_file.rs
+++ b/crates/paimon/src/spec/data_file.rs
@@ -83,10 +83,11 @@ impl BinaryRow {
         }
     }
 
-    /// Create a BinaryRow from raw binary bytes (e.g. from 
`ManifestEntry._PARTITION`).
+    /// Create a BinaryRow from raw binary bytes.
     ///
     /// The `data` must contain the full binary row content:
     /// header + null bit set + fixed-length part + variable-length part.
+    /// Does NOT include the 4-byte arity prefix (use `from_serialized_bytes` 
for that).
     pub fn from_bytes(arity: i32, data: Vec<u8>) -> Self {
         let null_bits_size_in_bytes = Self::cal_bit_set_width_in_bytes(arity);
         Self {
@@ -96,6 +97,28 @@ impl BinaryRow {
         }
     }
 
+    /// Create a BinaryRow from Paimon's serialized format (e.g. 
`ManifestEntry._PARTITION`).
+    ///
+    /// Java `SerializationUtils.serializeBinaryRow()` prepends a 4-byte 
big-endian arity
+    /// before the raw BinaryRow content. This method reads that prefix and 
strips it,
+    /// matching Java `SerializationUtils.deserializeBinaryRow()`.
+    ///
+    /// Variable-length field offsets inside the BinaryRow content are 
relative to the
+    /// BinaryRow base, so they remain valid after stripping the 4-byte prefix.
+    pub fn from_serialized_bytes(data: &[u8]) -> crate::Result<Self> {
+        if data.len() < 4 {
+            return Err(crate::Error::UnexpectedError {
+                message: format!(
+                    "BinaryRow: serialized data too short for arity prefix: {} 
bytes",
+                    data.len()
+                ),
+                source: None,
+            });
+        }
+        let arity = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
+        Ok(Self::from_bytes(arity, data[4..].to_vec()))
+    }
+
     /// Number of fields in this row.
     pub fn arity(&self) -> i32 {
         self.arity
@@ -591,6 +614,30 @@ mod tests {
         assert_eq!(BinaryRow::cal_bit_set_width_in_bytes(57), 16);
     }
 
+    #[test]
+    fn test_from_serialized_bytes() {
+        // Build a raw BinaryRow with arity=1, int value 42, then prepend 
4-byte BE arity prefix
+        // to simulate Java SerializationUtils.serializeBinaryRow() format.
+        let mut builder = BinaryRowBuilder::new(1);
+        builder.write_int(0, 42);
+        let raw_row = builder.build();
+        let raw_data = raw_row.data();
+
+        let mut serialized = Vec::with_capacity(4 + raw_data.len());
+        serialized.extend_from_slice(&1_i32.to_be_bytes());
+        serialized.extend_from_slice(raw_data);
+
+        let row = BinaryRow::from_serialized_bytes(&serialized).unwrap();
+        assert_eq!(row.arity(), 1);
+        assert!(!row.is_null_at(0));
+        assert_eq!(row.get_int(0).unwrap(), 42);
+    }
+
+    #[test]
+    fn test_from_serialized_bytes_too_short() {
+        assert!(BinaryRow::from_serialized_bytes(&[0, 0]).is_err());
+    }
+
     #[test]
     fn test_get_int() {
         let mut builder = BinaryRowBuilder::new(2);
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index f1ef422..83af8be 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -54,3 +54,4 @@ pub(crate) mod stats;
 mod types;
 pub use types::*;
 mod partition_utils;
+pub(crate) use partition_utils::PartitionComputer;
diff --git a/crates/paimon/src/spec/partition_utils.rs 
b/crates/paimon/src/spec/partition_utils.rs
index 418ecfc..fa41cfc 100644
--- a/crates/paimon/src/spec/partition_utils.rs
+++ b/crates/paimon/src/spec/partition_utils.rs
@@ -29,10 +29,6 @@ use crate::spec::BinaryRow;
 use crate::spec::DataField;
 use chrono::{Local, NaiveDate, NaiveDateTime, TimeZone, Timelike};
 
-// TODO: remove after #131 consumes the pub(crate) API.
-#[allow(dead_code)]
-pub const DEFAULT_PARTITION_NAME: &str = "__DEFAULT_PARTITION__";
-
 const MILLIS_PER_DAY: i64 = 86_400_000;
 
 /// Computes partition string values and directory paths from a partition 
`BinaryRow`.
@@ -42,8 +38,6 @@ const MILLIS_PER_DAY: i64 = 86_400_000;
 /// (escaped directory path).
 ///
 /// Reference: `org.apache.paimon.utils.InternalRowPartitionComputer` in Java 
Paimon.
-// TODO: remove after #131 consumes the pub(crate) API.
-#[allow(dead_code)]
 pub(crate) struct PartitionComputer {
     partition_keys: Vec<String>,
     partition_fields: Vec<DataField>,
@@ -51,7 +45,6 @@ pub(crate) struct PartitionComputer {
     legacy_partition_name: bool,
 }
 
-#[allow(dead_code)]
 impl PartitionComputer {
     /// Create a new `PartitionComputer`.
     ///
@@ -155,28 +148,6 @@ impl PartitionComputer {
     }
 }
 
-/// Backward-compatible free function that delegates to `PartitionComputer`.
-// TODO: remove after #131 consumes the pub(crate) API.
-#[allow(dead_code)]
-pub(crate) fn generate_partition_path(
-    partition_keys: &[String],
-    schema_fields: &[DataField],
-    row: &BinaryRow,
-    default_partition_name: &str,
-    legacy_partition_name: bool,
-) -> crate::Result<String> {
-    if partition_keys.is_empty() {
-        return Ok(String::new());
-    }
-    let computer = PartitionComputer::new(
-        partition_keys,
-        schema_fields,
-        default_partition_name,
-        legacy_partition_name,
-    )?;
-    computer.generate_partition_path(row)
-}
-
 /// Resolve the `DataField` for each partition key from the schema fields, 
preserving order.
 fn resolve_partition_fields<'a>(
     partition_keys: &[String],
@@ -622,6 +593,8 @@ mod tests {
         DataField::new(0, name.to_string(), data_type)
     }
 
+    const TEST_DEFAULT_PARTITION_NAME: &str = "__DEFAULT_PARTITION__";
+
     /// Helper: assert single-column partition path for a given type and row 
writer.
     fn assert_single_partition<F>(
         name: &str,
@@ -634,11 +607,12 @@ mod tests {
     {
         let fields = vec![make_field(name, data_type)];
         let keys = vec![name.to_string()];
+        let computer =
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, legacy).unwrap();
         let mut builder = TestRowBuilder::new(1);
         write_fn(&mut builder);
         let row = builder.build();
-        let result =
-            generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, legacy).unwrap();
+        let result = computer.generate_partition_path(&row).unwrap();
         assert_eq!(result, expected);
     }
 
@@ -649,12 +623,12 @@ mod tests {
     {
         let fields = vec![make_field(name, data_type)];
         let keys = vec![name.to_string()];
+        let computer =
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, legacy).unwrap();
         let mut builder = TestRowBuilder::new(1);
         write_fn(&mut builder);
         let row = builder.build();
-        assert!(
-            generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, legacy).is_err()
-        );
+        assert!(computer.generate_partition_path(&row).is_err());
     }
 
     // ======================== Escape tests ========================
@@ -689,7 +663,7 @@ mod tests {
         ];
         let keys = vec!["dt".to_string(), "hr".to_string()];
         let computer =
-            PartitionComputer::new(&keys, &fields, DEFAULT_PARTITION_NAME, 
true).unwrap();
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
 
         let mut builder = TestRowBuilder::new(2);
         builder.write_string(0, "2024-01-01");
@@ -710,7 +684,7 @@ mod tests {
         ];
         let keys = vec!["dt".to_string(), "hr".to_string()];
         let computer =
-            PartitionComputer::new(&keys, &fields, DEFAULT_PARTITION_NAME, 
true).unwrap();
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
 
         let mut builder = TestRowBuilder::new(2);
         builder.write_string(0, "2024-01-01");
@@ -726,7 +700,8 @@ mod tests {
     #[test]
     fn test_empty_partition_keys() {
         let row = BinaryRow::new(0);
-        let result = generate_partition_path(&[], &[], &row, 
DEFAULT_PARTITION_NAME, true).unwrap();
+        let computer = PartitionComputer::new(&[], &[], 
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
+        let result = computer.generate_partition_path(&row).unwrap();
         assert_eq!(result, "");
     }
 
@@ -748,14 +723,15 @@ mod tests {
             make_field("hr", DataType::Int(IntType::new())),
         ];
         let keys = vec!["dt".to_string(), "hr".to_string()];
+        let computer =
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
 
         let mut builder = TestRowBuilder::new(2);
         builder.write_string(0, "2024-01-01");
         builder.write_int(1, 12);
         let row = builder.build();
 
-        let result =
-            generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, true).unwrap();
+        let result = computer.generate_partition_path(&row).unwrap();
         assert_eq!(result, "dt=2024-01-01/hr=12/");
     }
 
@@ -763,13 +739,14 @@ mod tests {
     fn test_null_partition_value() {
         let fields = vec![make_field("dt", 
DataType::VarChar(VarCharType::default()))];
         let keys = vec!["dt".to_string()];
+        let computer =
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
 
         let mut builder = TestRowBuilder::new(1);
         builder.set_null_at(0);
         let row = builder.build();
 
-        let result =
-            generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, true).unwrap();
+        let result = computer.generate_partition_path(&row).unwrap();
         assert_eq!(result, "dt=__DEFAULT_PARTITION__/");
     }
 
@@ -952,8 +929,12 @@ mod tests {
         builder.write_int(0, 1);
         let row = builder.build();
 
-        let result = generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, true);
-        assert!(result.is_err());
+        let result = PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true);
+        // Construction succeeds (field resolution fails for hr), or path 
generation fails due to arity mismatch
+        match result {
+            Err(_) => {} // field resolution failed — expected
+            Ok(computer) => 
assert!(computer.generate_partition_path(&row).is_err()),
+        }
     }
 
     #[test]
@@ -961,11 +942,7 @@ mod tests {
         let fields = vec![make_field("other", DataType::Int(IntType::new()))];
         let keys = vec!["dt".to_string()];
 
-        let mut builder = TestRowBuilder::new(1);
-        builder.write_int(0, 1);
-        let row = builder.build();
-
-        let result = generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, true);
+        let result = PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true);
         assert!(result.is_err());
     }
 
@@ -1005,9 +982,11 @@ mod tests {
     fn test_empty_row_with_partition_keys() {
         let fields = vec![make_field("dt", DataType::Int(IntType::new()))];
         let keys = vec!["dt".to_string()];
+        let computer =
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
         let row = BinaryRow::new(1); // empty backing data
 
-        let result = generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, true);
+        let result = computer.generate_partition_path(&row);
         assert!(result.is_err());
     }
 
@@ -1059,11 +1038,13 @@ mod tests {
     fn test_truncated_row_returns_error() {
         let fields = vec![make_field("dt", DataType::Int(IntType::new()))];
         let keys = vec!["dt".to_string()];
+        let computer =
+            PartitionComputer::new(&keys, &fields, 
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
 
         // Create a BinaryRow with arity=1 but truncated backing data (too 
short).
         let row = BinaryRow::from_bytes(1, vec![0u8; 4]); // needs >= 16 bytes
 
-        let result = generate_partition_path(&keys, &fields, &row, 
DEFAULT_PARTITION_NAME, true);
+        let result = computer.generate_partition_path(&row);
         assert!(result.is_err());
         let msg = result.unwrap_err().to_string();
         assert!(msg.contains("too short"), "Expected 'too short' in: {msg}");
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index dded84c..1f640cd 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -90,14 +90,6 @@ impl<'a> TableRead<'a> {
             });
         }
 
-        if !self.table.schema.partition_keys().is_empty() {
-            return Err(Error::Unsupported {
-                message: format!(
-                    "Reading partitioned tables is not yet supported. 
Partition keys: {:?}",
-                    self.table.schema.partition_keys()
-                ),
-            });
-        }
         let reader =
             
ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec());
         reader.read(data_splits)
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 5d5bc4a..1c3454a 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -22,7 +22,9 @@
 
 use super::Table;
 use crate::io::FileIO;
-use crate::spec::{BinaryRow, CoreOptions, FileKind, IndexManifest, 
ManifestEntry, Snapshot};
+use crate::spec::{
+    BinaryRow, CoreOptions, FileKind, IndexManifest, ManifestEntry, 
PartitionComputer, Snapshot,
+};
 use crate::table::bin_pack::split_for_batch;
 use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket, 
Plan};
 use crate::table::SnapshotManager;
@@ -172,29 +174,16 @@ impl<'a> TableScan<'a> {
             Some(s) => s,
             None => return Ok(Plan::new(Vec::new())),
         };
+        self.plan_snapshot(snapshot).await
+    }
+
+    async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result<Plan> {
+        let file_io = self.table.file_io();
+        let table_path = self.table.location();
         let core_options = CoreOptions::new(self.table.schema().options());
+        let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
         let target_split_size = core_options.source_split_target_size();
         let open_file_cost = core_options.source_split_open_file_cost();
-        let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
-        Self::plan_snapshot(
-            snapshot,
-            file_io,
-            table_path,
-            target_split_size,
-            open_file_cost,
-            deletion_vectors_enabled,
-        )
-        .await
-    }
-
-    async fn plan_snapshot(
-        snapshot: Snapshot,
-        file_io: &FileIO,
-        table_path: &str,
-        target_split_size: i64,
-        open_file_cost: i64,
-        deletion_vectors_enabled: bool,
-    ) -> crate::Result<Plan> {
         let entries = read_all_manifest_entries(file_io, table_path, 
&snapshot).await?;
         let entries = filter_manifest_entries(entries, 
deletion_vectors_enabled);
         let entries = merge_manifest_entries(entries);
@@ -210,13 +199,24 @@ impl<'a> TableScan<'a> {
         }
 
         let snapshot_id = snapshot.id();
-        let base_path = table_path;
+        let base_path = table_path.trim_end_matches('/');
         let mut splits = Vec::new();
 
+        let partition_keys = self.table.schema().partition_keys();
+        let partition_computer = if !partition_keys.is_empty() {
+            Some(PartitionComputer::new(
+                partition_keys,
+                self.table.schema().fields(),
+                core_options.partition_default_name(),
+                core_options.legacy_partition_name(),
+            )?)
+        } else {
+            None
+        };
+
         // Read deletion vector index manifest once (like Java generateSplits 
/ scanDvIndex).
         let deletion_files_map = if let Some(index_manifest_name) = 
snapshot.index_manifest() {
-            let index_manifest_path =
-                format!("{}/{}", base_path.trim_end_matches('/'), 
MANIFEST_DIR);
+            let index_manifest_path = format!("{base_path}/{MANIFEST_DIR}");
             let path = format!("{index_manifest_path}/{index_manifest_name}");
             let index_entries = IndexManifest::read(file_io, &path).await?;
             Some(build_deletion_files_map(&index_entries, base_path))
@@ -225,6 +225,8 @@ impl<'a> TableScan<'a> {
         };
 
         for ((partition, bucket), group_entries) in groups {
+            let partition_row = BinaryRow::from_serialized_bytes(&partition)?;
+
             let total_buckets = group_entries
                 .first()
                 .map(|e| e.total_buckets())
@@ -240,18 +242,20 @@ impl<'a> TableScan<'a> {
                 })
                 .collect();
 
-            // todo: consider partitioned table
-            let bucket_path = format!("{base_path}/bucket-{bucket}");
+            let bucket_path = if let Some(ref computer) = partition_computer {
+                let partition_path = 
computer.generate_partition_path(&partition_row)?;
+                format!("{base_path}/{partition_path}bucket-{bucket}")
+            } else {
+                format!("{base_path}/bucket-{bucket}")
+            };
 
-            // Get the per-bucket deletion file map for looking up by file 
name after bin packing.
+            // Original `partition` Vec consumed by PartitionBucket for DV map 
lookup.
             let per_bucket_deletion_map = deletion_files_map
                 .as_ref()
                 .and_then(|map| map.get(&PartitionBucket::new(partition, 
bucket)));
 
             let file_groups = split_for_batch(data_files, target_split_size, 
open_file_cost);
             for file_group in file_groups {
-                // Build deletion files list for this specific file group (by 
file name lookup),
-                // matching Python's _get_deletion_files_for_split.
                 let data_deletion_files = 
per_bucket_deletion_map.map(|per_bucket| {
                     file_group
                         .iter()
@@ -261,8 +265,7 @@ impl<'a> TableScan<'a> {
 
                 let mut builder = DataSplitBuilder::new()
                     .with_snapshot(snapshot_id)
-                    // todo: consider pass real partition
-                    .with_partition(BinaryRow::new(0))
+                    .with_partition(partition_row.clone())
                     .with_bucket(bucket)
                     .with_bucket_path(bucket_path.clone())
                     .with_total_buckets(total_buckets)
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 21a7a52..a813238 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -110,6 +110,92 @@ def main():
         """
     )
 
+    # ===== Partitioned table: single partition key (dt) =====
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS partitioned_log_table (
+            id INT,
+            name STRING,
+            dt STRING
+        ) USING paimon
+        PARTITIONED BY (dt)
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO partitioned_log_table VALUES
+            (1, 'alice', '2024-01-01'),
+            (2, 'bob', '2024-01-01'),
+            (3, 'carol', '2024-01-02')
+        """
+    )
+
+    # ===== Partitioned table: multiple partition keys (dt, hr) =====
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS multi_partitioned_log_table (
+            id INT,
+            name STRING,
+            dt STRING,
+            hr INT
+        ) USING paimon
+        PARTITIONED BY (dt, hr)
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO multi_partitioned_log_table VALUES
+            (1, 'alice', '2024-01-01', 10),
+            (2, 'bob', '2024-01-01', 10),
+            (3, 'carol', '2024-01-01', 20),
+            (4, 'dave', '2024-01-02', 10)
+    """
+    )
+
+    # ===== Partitioned table: PK + DV enabled =====
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS partitioned_dv_pk_table (
+            id INT,
+            name STRING,
+            dt STRING
+        ) USING paimon
+        PARTITIONED BY (dt)
+        TBLPROPERTIES (
+            'primary-key' = 'id,dt',
+            'bucket' = '1',
+            'deletion-vectors.enabled' = 'true'
+        )
+        """
+    )
+
+    spark.sql(
+        """
+        INSERT INTO partitioned_dv_pk_table VALUES
+            (1, 'alice-v1', '2024-01-01'),
+            (2, 'bob-v1', '2024-01-01'),
+            (1, 'alice-v1', '2024-01-02'),
+            (3, 'carol-v1', '2024-01-02')
+        """
+    )
+
+    spark.sql(
+        """
+        INSERT INTO partitioned_dv_pk_table VALUES
+            (1, 'alice-v2', '2024-01-01'),
+            (3, 'carol-v2', '2024-01-02'),
+            (4, 'dave-v1', '2024-01-02')
+        """
+    )
+
+    spark.sql(
+        """
+        INSERT INTO partitioned_dv_pk_table VALUES
+            (2, 'bob-v2', '2024-01-01'),
+            (4, 'dave-v2', '2024-01-02')
+        """
+    )
+
 
 if __name__ == "__main__":
     main()


Reply via email to