This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e1cb194 feat: support partitioned table in TableScan and TableRead
(#145)
e1cb194 is described below
commit e1cb194fe72ac4918ede62f42dad23346b02aee4
Author: Zach <[email protected]>
AuthorDate: Wed Mar 25 08:08:05 2026 +0800
feat: support partitioned table in TableScan and TableRead (#145)
---
crates/integration_tests/tests/read_tables.rs | 248 ++++++++++++++++++++++----
crates/paimon/src/spec/core_options.rs | 51 +++++-
crates/paimon/src/spec/data_file.rs | 49 ++++-
crates/paimon/src/spec/mod.rs | 1 +
crates/paimon/src/spec/partition_utils.rs | 81 ++++-----
crates/paimon/src/table/read_builder.rs | 8 -
crates/paimon/src/table/table_scan.rs | 65 +++----
dev/spark/provision.py | 86 +++++++++
8 files changed, 466 insertions(+), 123 deletions(-)
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 69a05f7..f7f7b57 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -9,28 +9,27 @@
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on
-// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Integration tests for reading Paimon tables provisioned by Spark.
-use arrow_array::{Int32Array, StringArray};
+use arrow_array::{Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use paimon::catalog::Identifier;
-use paimon::{Catalog, FileSystemCatalog};
+use paimon::{Catalog, FileSystemCatalog, Plan};
+use std::collections::HashSet;
-/// Get the test warehouse path from environment variable or use default.
fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_|
"/tmp/paimon-warehouse".to_string())
}
-async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
+async fn scan_and_read(table_name: &str) -> (Plan, Vec<RecordBatch>) {
let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create
catalog");
-
let identifier = Identifier::new("default", table_name);
let table = catalog
.get_table(&identifier)
@@ -38,14 +37,13 @@ async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
.expect("Failed to get table");
let read_builder = table.new_read_builder();
- let read = read_builder.new_read().expect("Failed to create read");
let scan = read_builder.new_scan();
let plan = scan.plan().await.expect("Failed to plan scan");
+ let read = read_builder.new_read().expect("Failed to create read");
let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");
-
let batches: Vec<_> = stream
.try_collect()
.await
@@ -55,47 +53,57 @@ async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
!batches.is_empty(),
"Expected at least one batch from table {table_name}"
);
+ (plan, batches)
+}
- let mut actual_rows: Vec<(i32, String)> = Vec::new();
-
- for batch in &batches {
- let id_array = batch
+fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
+ let mut rows = Vec::new();
+ for batch in batches {
+ let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
- .expect("Expected Int32Array for id column");
- let name_array = batch
+ .expect("Expected Int32Array for id");
+ let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
- .expect("Expected StringArray for name column");
-
+ .expect("Expected StringArray for name");
for i in 0..batch.num_rows() {
- actual_rows.push((id_array.value(i),
name_array.value(i).to_string()));
+ rows.push((id.value(i), name.value(i).to_string()));
}
}
-
- actual_rows.sort_by_key(|(id, _)| *id);
- actual_rows
+ rows.sort_by_key(|(id, _)| *id);
+ rows
}
#[tokio::test]
async fn test_read_log_table() {
- let actual_rows = read_rows("simple_log_table").await;
- let expected_rows = vec![
+ let (plan, batches) = scan_and_read("simple_log_table").await;
+
+ // Non-partitioned table: partition should be a valid arity=0 BinaryRow
+ // deserialized from manifest bytes, not a stub without backing data.
+ for split in plan.splits() {
+ let partition = split.partition();
+ assert_eq!(partition.arity(), 0);
+ assert!(
+ !partition.is_empty(),
+ "Non-partitioned split should have backing data from manifest
deserialization"
+ );
+ }
+
+ let actual = extract_id_name(&batches);
+ let expected = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
];
-
- assert_eq!(
- actual_rows, expected_rows,
- "Rows should match expected values"
- );
+ assert_eq!(actual, expected, "Rows should match expected values");
}
#[tokio::test]
async fn test_read_dv_primary_key_table() {
- let actual_rows = read_rows("simple_dv_pk_table").await;
- let expected_rows = vec![
+ let (_, batches) = scan_and_read("simple_dv_pk_table").await;
+ let actual = extract_id_name(&batches);
+ let expected = vec![
(1, "alice-v2".to_string()),
(2, "bob-v2".to_string()),
(3, "carol-v2".to_string()),
@@ -103,9 +111,185 @@ async fn test_read_dv_primary_key_table() {
(5, "eve-v2".to_string()),
(6, "frank-v1".to_string()),
];
-
assert_eq!(
- actual_rows, expected_rows,
+ actual, expected,
"DV-enabled PK table should only expose the latest row per key"
);
}
+
+#[tokio::test]
+async fn test_read_partitioned_log_table() {
+ let (plan, batches) = scan_and_read("partitioned_log_table").await;
+
+ let mut seen_partitions: HashSet<String> = HashSet::new();
+ for split in plan.splits() {
+ let partition = split.partition();
+ assert_eq!(partition.arity(), 1);
+ assert!(!partition.is_empty());
+ let dt = partition.get_string(0).expect("Failed to decode dt");
+ let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket());
+ assert!(
+ split.bucket_path().ends_with(&expected_suffix),
+ "bucket_path should end with '{expected_suffix}', got: {}",
+ split.bucket_path()
+ );
+ seen_partitions.insert(dt.to_string());
+ }
+ assert_eq!(
+ seen_partitions,
+ HashSet::from(["2024-01-01".into(), "2024-01-02".into()])
+ );
+
+ let mut rows: Vec<(i32, String, String)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let dt = batch
+ .column_by_name("dt")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("dt");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
+ }
+ }
+ rows.sort_by_key(|(id, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".into(), "2024-01-01".into()),
+ (2, "bob".into(), "2024-01-01".into()),
+ (3, "carol".into(), "2024-01-02".into()),
+ ]
+ );
+}
+
+#[tokio::test]
+async fn test_read_multi_partitioned_log_table() {
+ let (plan, batches) = scan_and_read("multi_partitioned_log_table").await;
+
+ let mut seen_partitions: HashSet<(String, i32)> = HashSet::new();
+ for split in plan.splits() {
+ let partition = split.partition();
+ assert_eq!(partition.arity(), 2);
+ assert!(!partition.is_empty());
+ let dt = partition.get_string(0).expect("Failed to decode dt");
+ let hr = partition.get_int(1).expect("Failed to decode hr");
+ let expected_suffix = format!("dt={dt}/hr={hr}/bucket-{}",
split.bucket());
+ assert!(
+ split.bucket_path().ends_with(&expected_suffix),
+ "bucket_path should end with '{expected_suffix}', got: {}",
+ split.bucket_path()
+ );
+ seen_partitions.insert((dt.to_string(), hr));
+ }
+ assert_eq!(
+ seen_partitions,
+ HashSet::from([
+ ("2024-01-01".into(), 10),
+ ("2024-01-01".into(), 20),
+ ("2024-01-02".into(), 10),
+ ])
+ );
+
+ let mut rows: Vec<(i32, String, String, i32)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let dt = batch
+ .column_by_name("dt")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("dt");
+ let hr = batch
+ .column_by_name("hr")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("hr");
+ for i in 0..batch.num_rows() {
+ rows.push((
+ id.value(i),
+ name.value(i).into(),
+ dt.value(i).into(),
+ hr.value(i),
+ ));
+ }
+ }
+ rows.sort_by_key(|(id, _, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".into(), "2024-01-01".into(), 10),
+ (2, "bob".into(), "2024-01-01".into(), 10),
+ (3, "carol".into(), "2024-01-01".into(), 20),
+ (4, "dave".into(), "2024-01-02".into(), 10),
+ ]
+ );
+}
+
+#[tokio::test]
+async fn test_read_partitioned_dv_pk_table() {
+ let (plan, batches) = scan_and_read("partitioned_dv_pk_table").await;
+
+ // Verify partition metadata on each split.
+ let mut seen_partitions: HashSet<String> = HashSet::new();
+ for split in plan.splits() {
+ let partition = split.partition();
+ assert_eq!(partition.arity(), 1);
+ assert!(!partition.is_empty());
+ let dt = partition.get_string(0).expect("Failed to decode dt");
+ let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket());
+ assert!(
+ split.bucket_path().ends_with(&expected_suffix),
+ "bucket_path should end with '{expected_suffix}', got: {}",
+ split.bucket_path()
+ );
+ seen_partitions.insert(dt.to_string());
+ }
+ assert_eq!(
+ seen_partitions,
+ HashSet::from(["2024-01-01".into(), "2024-01-02".into()])
+ );
+
+ let mut rows: Vec<(i32, String, String)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let dt = batch
+ .column_by_name("dt")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("dt");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
+ }
+ }
+ rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice-v2".into(), "2024-01-01".into()),
+ (1, "alice-v1".into(), "2024-01-02".into()),
+ (2, "bob-v2".into(), "2024-01-01".into()),
+ (3, "carol-v2".into(), "2024-01-02".into()),
+ (4, "dave-v2".into(), "2024-01-02".into()),
+ ]
+ );
+}
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index 484ab57..f6164d0 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -20,8 +20,11 @@ use std::collections::HashMap;
const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
+const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
+const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
+const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
/// Typed accessors for common table options.
///
@@ -39,7 +42,7 @@ impl<'a> CoreOptions<'a> {
pub fn deletion_vectors_enabled(&self) -> bool {
self.options
.get(DELETION_VECTORS_ENABLED_OPTION)
- .map(|value| matches!(value.to_ascii_lowercase().as_str(), "true"))
+ .map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
@@ -56,6 +59,27 @@ impl<'a> CoreOptions<'a> {
.and_then(|value| parse_memory_size(value))
.unwrap_or(DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST)
}
+
+ /// The default partition name for null/blank partition values.
+ ///
+ /// Corresponds to Java `CoreOptions.PARTITION_DEFAULT_NAME`.
+ pub fn partition_default_name(&self) -> &str {
+ self.options
+ .get(PARTITION_DEFAULT_NAME_OPTION)
+ .map(String::as_str)
+ .unwrap_or(DEFAULT_PARTITION_DEFAULT_NAME)
+ }
+
+ /// Whether to use legacy partition name formatting (toString semantics).
+ ///
+ /// Corresponds to Java `CoreOptions.PARTITION_GENERATE_LEGACY_NAME`.
+ /// Default: `true` to match Java Paimon.
+ pub fn legacy_partition_name(&self) -> bool {
+ self.options
+ .get(PARTITION_LEGACY_NAME_OPTION)
+ .map(|v| v.eq_ignore_ascii_case("true"))
+ .unwrap_or(true)
+ }
}
/// Parse a memory size string to bytes using binary (1024-based) semantics.
@@ -131,4 +155,29 @@ mod tests {
assert_eq!(parse_memory_size(""), None);
assert_eq!(parse_memory_size("abc"), None);
}
+
+ #[test]
+ fn test_partition_options_defaults() {
+ let options = HashMap::new();
+ let core = CoreOptions::new(&options);
+ assert_eq!(core.partition_default_name(), "__DEFAULT_PARTITION__");
+ assert!(core.legacy_partition_name());
+ }
+
+ #[test]
+ fn test_partition_options_custom() {
+ let options = HashMap::from([
+ (
+ PARTITION_DEFAULT_NAME_OPTION.to_string(),
+ "NULL_PART".to_string(),
+ ),
+ (
+ PARTITION_LEGACY_NAME_OPTION.to_string(),
+ "false".to_string(),
+ ),
+ ]);
+ let core = CoreOptions::new(&options);
+ assert_eq!(core.partition_default_name(), "NULL_PART");
+ assert!(!core.legacy_partition_name());
+ }
}
diff --git a/crates/paimon/src/spec/data_file.rs
b/crates/paimon/src/spec/data_file.rs
index 9934e63..8733b1d 100644
--- a/crates/paimon/src/spec/data_file.rs
+++ b/crates/paimon/src/spec/data_file.rs
@@ -83,10 +83,11 @@ impl BinaryRow {
}
}
- /// Create a BinaryRow from raw binary bytes (e.g. from
`ManifestEntry._PARTITION`).
+ /// Create a BinaryRow from raw binary bytes.
///
/// The `data` must contain the full binary row content:
/// header + null bit set + fixed-length part + variable-length part.
+ /// Does NOT include the 4-byte arity prefix (use `from_serialized_bytes`
for that).
pub fn from_bytes(arity: i32, data: Vec<u8>) -> Self {
let null_bits_size_in_bytes = Self::cal_bit_set_width_in_bytes(arity);
Self {
@@ -96,6 +97,28 @@ impl BinaryRow {
}
}
+ /// Create a BinaryRow from Paimon's serialized format (e.g.
`ManifestEntry._PARTITION`).
+ ///
+ /// Java `SerializationUtils.serializeBinaryRow()` prepends a 4-byte
big-endian arity
+ /// before the raw BinaryRow content. This method reads that prefix and
strips it,
+ /// matching Java `SerializationUtils.deserializeBinaryRow()`.
+ ///
+ /// Variable-length field offsets inside the BinaryRow content are
relative to the
+ /// BinaryRow base, so they remain valid after stripping the 4-byte prefix.
+ pub fn from_serialized_bytes(data: &[u8]) -> crate::Result<Self> {
+ if data.len() < 4 {
+ return Err(crate::Error::UnexpectedError {
+ message: format!(
+ "BinaryRow: serialized data too short for arity prefix: {}
bytes",
+ data.len()
+ ),
+ source: None,
+ });
+ }
+ let arity = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
+ Ok(Self::from_bytes(arity, data[4..].to_vec()))
+ }
+
/// Number of fields in this row.
pub fn arity(&self) -> i32 {
self.arity
@@ -591,6 +614,30 @@ mod tests {
assert_eq!(BinaryRow::cal_bit_set_width_in_bytes(57), 16);
}
+ #[test]
+ fn test_from_serialized_bytes() {
+ // Build a raw BinaryRow with arity=1, int value 42, then prepend
4-byte BE arity prefix
+ // to simulate Java SerializationUtils.serializeBinaryRow() format.
+ let mut builder = BinaryRowBuilder::new(1);
+ builder.write_int(0, 42);
+ let raw_row = builder.build();
+ let raw_data = raw_row.data();
+
+ let mut serialized = Vec::with_capacity(4 + raw_data.len());
+ serialized.extend_from_slice(&1_i32.to_be_bytes());
+ serialized.extend_from_slice(raw_data);
+
+ let row = BinaryRow::from_serialized_bytes(&serialized).unwrap();
+ assert_eq!(row.arity(), 1);
+ assert!(!row.is_null_at(0));
+ assert_eq!(row.get_int(0).unwrap(), 42);
+ }
+
+ #[test]
+ fn test_from_serialized_bytes_too_short() {
+ assert!(BinaryRow::from_serialized_bytes(&[0, 0]).is_err());
+ }
+
#[test]
fn test_get_int() {
let mut builder = BinaryRowBuilder::new(2);
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index f1ef422..83af8be 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -54,3 +54,4 @@ pub(crate) mod stats;
mod types;
pub use types::*;
mod partition_utils;
+pub(crate) use partition_utils::PartitionComputer;
diff --git a/crates/paimon/src/spec/partition_utils.rs
b/crates/paimon/src/spec/partition_utils.rs
index 418ecfc..fa41cfc 100644
--- a/crates/paimon/src/spec/partition_utils.rs
+++ b/crates/paimon/src/spec/partition_utils.rs
@@ -29,10 +29,6 @@ use crate::spec::BinaryRow;
use crate::spec::DataField;
use chrono::{Local, NaiveDate, NaiveDateTime, TimeZone, Timelike};
-// TODO: remove after #131 consumes the pub(crate) API.
-#[allow(dead_code)]
-pub const DEFAULT_PARTITION_NAME: &str = "__DEFAULT_PARTITION__";
-
const MILLIS_PER_DAY: i64 = 86_400_000;
/// Computes partition string values and directory paths from a partition
`BinaryRow`.
@@ -42,8 +38,6 @@ const MILLIS_PER_DAY: i64 = 86_400_000;
/// (escaped directory path).
///
/// Reference: `org.apache.paimon.utils.InternalRowPartitionComputer` in Java
Paimon.
-// TODO: remove after #131 consumes the pub(crate) API.
-#[allow(dead_code)]
pub(crate) struct PartitionComputer {
partition_keys: Vec<String>,
partition_fields: Vec<DataField>,
@@ -51,7 +45,6 @@ pub(crate) struct PartitionComputer {
legacy_partition_name: bool,
}
-#[allow(dead_code)]
impl PartitionComputer {
/// Create a new `PartitionComputer`.
///
@@ -155,28 +148,6 @@ impl PartitionComputer {
}
}
-/// Backward-compatible free function that delegates to `PartitionComputer`.
-// TODO: remove after #131 consumes the pub(crate) API.
-#[allow(dead_code)]
-pub(crate) fn generate_partition_path(
- partition_keys: &[String],
- schema_fields: &[DataField],
- row: &BinaryRow,
- default_partition_name: &str,
- legacy_partition_name: bool,
-) -> crate::Result<String> {
- if partition_keys.is_empty() {
- return Ok(String::new());
- }
- let computer = PartitionComputer::new(
- partition_keys,
- schema_fields,
- default_partition_name,
- legacy_partition_name,
- )?;
- computer.generate_partition_path(row)
-}
-
/// Resolve the `DataField` for each partition key from the schema fields,
preserving order.
fn resolve_partition_fields<'a>(
partition_keys: &[String],
@@ -622,6 +593,8 @@ mod tests {
DataField::new(0, name.to_string(), data_type)
}
+ const TEST_DEFAULT_PARTITION_NAME: &str = "__DEFAULT_PARTITION__";
+
/// Helper: assert single-column partition path for a given type and row
writer.
fn assert_single_partition<F>(
name: &str,
@@ -634,11 +607,12 @@ mod tests {
{
let fields = vec![make_field(name, data_type)];
let keys = vec![name.to_string()];
+ let computer =
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, legacy).unwrap();
let mut builder = TestRowBuilder::new(1);
write_fn(&mut builder);
let row = builder.build();
- let result =
- generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, legacy).unwrap();
+ let result = computer.generate_partition_path(&row).unwrap();
assert_eq!(result, expected);
}
@@ -649,12 +623,12 @@ mod tests {
{
let fields = vec![make_field(name, data_type)];
let keys = vec![name.to_string()];
+ let computer =
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, legacy).unwrap();
let mut builder = TestRowBuilder::new(1);
write_fn(&mut builder);
let row = builder.build();
- assert!(
- generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, legacy).is_err()
- );
+ assert!(computer.generate_partition_path(&row).is_err());
}
// ======================== Escape tests ========================
@@ -689,7 +663,7 @@ mod tests {
];
let keys = vec!["dt".to_string(), "hr".to_string()];
let computer =
- PartitionComputer::new(&keys, &fields, DEFAULT_PARTITION_NAME,
true).unwrap();
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
let mut builder = TestRowBuilder::new(2);
builder.write_string(0, "2024-01-01");
@@ -710,7 +684,7 @@ mod tests {
];
let keys = vec!["dt".to_string(), "hr".to_string()];
let computer =
- PartitionComputer::new(&keys, &fields, DEFAULT_PARTITION_NAME,
true).unwrap();
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
let mut builder = TestRowBuilder::new(2);
builder.write_string(0, "2024-01-01");
@@ -726,7 +700,8 @@ mod tests {
#[test]
fn test_empty_partition_keys() {
let row = BinaryRow::new(0);
- let result = generate_partition_path(&[], &[], &row,
DEFAULT_PARTITION_NAME, true).unwrap();
+ let computer = PartitionComputer::new(&[], &[],
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
+ let result = computer.generate_partition_path(&row).unwrap();
assert_eq!(result, "");
}
@@ -748,14 +723,15 @@ mod tests {
make_field("hr", DataType::Int(IntType::new())),
];
let keys = vec!["dt".to_string(), "hr".to_string()];
+ let computer =
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
let mut builder = TestRowBuilder::new(2);
builder.write_string(0, "2024-01-01");
builder.write_int(1, 12);
let row = builder.build();
- let result =
- generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, true).unwrap();
+ let result = computer.generate_partition_path(&row).unwrap();
assert_eq!(result, "dt=2024-01-01/hr=12/");
}
@@ -763,13 +739,14 @@ mod tests {
fn test_null_partition_value() {
let fields = vec![make_field("dt",
DataType::VarChar(VarCharType::default()))];
let keys = vec!["dt".to_string()];
+ let computer =
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
let mut builder = TestRowBuilder::new(1);
builder.set_null_at(0);
let row = builder.build();
- let result =
- generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, true).unwrap();
+ let result = computer.generate_partition_path(&row).unwrap();
assert_eq!(result, "dt=__DEFAULT_PARTITION__/");
}
@@ -952,8 +929,12 @@ mod tests {
builder.write_int(0, 1);
let row = builder.build();
- let result = generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, true);
- assert!(result.is_err());
+ let result = PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true);
+ // Construction succeeds (field resolution fails for hr), or path
generation fails due to arity mismatch
+ match result {
+ Err(_) => {} // field resolution failed — expected
+ Ok(computer) =>
assert!(computer.generate_partition_path(&row).is_err()),
+ }
}
#[test]
@@ -961,11 +942,7 @@ mod tests {
let fields = vec![make_field("other", DataType::Int(IntType::new()))];
let keys = vec!["dt".to_string()];
- let mut builder = TestRowBuilder::new(1);
- builder.write_int(0, 1);
- let row = builder.build();
-
- let result = generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, true);
+ let result = PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true);
assert!(result.is_err());
}
@@ -1005,9 +982,11 @@ mod tests {
fn test_empty_row_with_partition_keys() {
let fields = vec![make_field("dt", DataType::Int(IntType::new()))];
let keys = vec!["dt".to_string()];
+ let computer =
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
let row = BinaryRow::new(1); // empty backing data
- let result = generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, true);
+ let result = computer.generate_partition_path(&row);
assert!(result.is_err());
}
@@ -1059,11 +1038,13 @@ mod tests {
fn test_truncated_row_returns_error() {
let fields = vec![make_field("dt", DataType::Int(IntType::new()))];
let keys = vec!["dt".to_string()];
+ let computer =
+ PartitionComputer::new(&keys, &fields,
TEST_DEFAULT_PARTITION_NAME, true).unwrap();
// Create a BinaryRow with arity=1 but truncated backing data (too
short).
let row = BinaryRow::from_bytes(1, vec![0u8; 4]); // needs >= 16 bytes
- let result = generate_partition_path(&keys, &fields, &row,
DEFAULT_PARTITION_NAME, true);
+ let result = computer.generate_partition_path(&row);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("too short"), "Expected 'too short' in: {msg}");
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index dded84c..1f640cd 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -90,14 +90,6 @@ impl<'a> TableRead<'a> {
});
}
- if !self.table.schema.partition_keys().is_empty() {
- return Err(Error::Unsupported {
- message: format!(
- "Reading partitioned tables is not yet supported.
Partition keys: {:?}",
- self.table.schema.partition_keys()
- ),
- });
- }
let reader =
ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec());
reader.read(data_splits)
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 5d5bc4a..1c3454a 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -22,7 +22,9 @@
use super::Table;
use crate::io::FileIO;
-use crate::spec::{BinaryRow, CoreOptions, FileKind, IndexManifest,
ManifestEntry, Snapshot};
+use crate::spec::{
+ BinaryRow, CoreOptions, FileKind, IndexManifest, ManifestEntry,
PartitionComputer, Snapshot,
+};
use crate::table::bin_pack::split_for_batch;
use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket,
Plan};
use crate::table::SnapshotManager;
@@ -172,29 +174,16 @@ impl<'a> TableScan<'a> {
Some(s) => s,
None => return Ok(Plan::new(Vec::new())),
};
+ self.plan_snapshot(snapshot).await
+ }
+
+ async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result<Plan> {
+ let file_io = self.table.file_io();
+ let table_path = self.table.location();
let core_options = CoreOptions::new(self.table.schema().options());
+ let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
let target_split_size = core_options.source_split_target_size();
let open_file_cost = core_options.source_split_open_file_cost();
- let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
- Self::plan_snapshot(
- snapshot,
- file_io,
- table_path,
- target_split_size,
- open_file_cost,
- deletion_vectors_enabled,
- )
- .await
- }
-
- async fn plan_snapshot(
- snapshot: Snapshot,
- file_io: &FileIO,
- table_path: &str,
- target_split_size: i64,
- open_file_cost: i64,
- deletion_vectors_enabled: bool,
- ) -> crate::Result<Plan> {
let entries = read_all_manifest_entries(file_io, table_path,
&snapshot).await?;
let entries = filter_manifest_entries(entries,
deletion_vectors_enabled);
let entries = merge_manifest_entries(entries);
@@ -210,13 +199,24 @@ impl<'a> TableScan<'a> {
}
let snapshot_id = snapshot.id();
- let base_path = table_path;
+ let base_path = table_path.trim_end_matches('/');
let mut splits = Vec::new();
+ let partition_keys = self.table.schema().partition_keys();
+ let partition_computer = if !partition_keys.is_empty() {
+ Some(PartitionComputer::new(
+ partition_keys,
+ self.table.schema().fields(),
+ core_options.partition_default_name(),
+ core_options.legacy_partition_name(),
+ )?)
+ } else {
+ None
+ };
+
// Read deletion vector index manifest once (like Java generateSplits
/ scanDvIndex).
let deletion_files_map = if let Some(index_manifest_name) =
snapshot.index_manifest() {
- let index_manifest_path =
- format!("{}/{}", base_path.trim_end_matches('/'),
MANIFEST_DIR);
+ let index_manifest_path = format!("{base_path}/{MANIFEST_DIR}");
let path = format!("{index_manifest_path}/{index_manifest_name}");
let index_entries = IndexManifest::read(file_io, &path).await?;
Some(build_deletion_files_map(&index_entries, base_path))
@@ -225,6 +225,8 @@ impl<'a> TableScan<'a> {
};
for ((partition, bucket), group_entries) in groups {
+ let partition_row = BinaryRow::from_serialized_bytes(&partition)?;
+
let total_buckets = group_entries
.first()
.map(|e| e.total_buckets())
@@ -240,18 +242,20 @@ impl<'a> TableScan<'a> {
})
.collect();
- // todo: consider partitioned table
- let bucket_path = format!("{base_path}/bucket-{bucket}");
+ let bucket_path = if let Some(ref computer) = partition_computer {
+ let partition_path =
computer.generate_partition_path(&partition_row)?;
+ format!("{base_path}/{partition_path}bucket-{bucket}")
+ } else {
+ format!("{base_path}/bucket-{bucket}")
+ };
- // Get the per-bucket deletion file map for looking up by file
name after bin packing.
+ // Original `partition` Vec consumed by PartitionBucket for DV map
lookup.
let per_bucket_deletion_map = deletion_files_map
.as_ref()
.and_then(|map| map.get(&PartitionBucket::new(partition,
bucket)));
let file_groups = split_for_batch(data_files, target_split_size,
open_file_cost);
for file_group in file_groups {
- // Build deletion files list for this specific file group (by
file name lookup),
- // matching Python's _get_deletion_files_for_split.
let data_deletion_files =
per_bucket_deletion_map.map(|per_bucket| {
file_group
.iter()
@@ -261,8 +265,7 @@ impl<'a> TableScan<'a> {
let mut builder = DataSplitBuilder::new()
.with_snapshot(snapshot_id)
- // todo: consider pass real partition
- .with_partition(BinaryRow::new(0))
+ .with_partition(partition_row.clone())
.with_bucket(bucket)
.with_bucket_path(bucket_path.clone())
.with_total_buckets(total_buckets)
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 21a7a52..a813238 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -110,6 +110,92 @@ def main():
"""
)
+ # ===== Partitioned table: single partition key (dt) =====
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS partitioned_log_table (
+ id INT,
+ name STRING,
+ dt STRING
+ ) USING paimon
+ PARTITIONED BY (dt)
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO partitioned_log_table VALUES
+ (1, 'alice', '2024-01-01'),
+ (2, 'bob', '2024-01-01'),
+ (3, 'carol', '2024-01-02')
+ """
+ )
+
+ # ===== Partitioned table: multiple partition keys (dt, hr) =====
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS multi_partitioned_log_table (
+ id INT,
+ name STRING,
+ dt STRING,
+ hr INT
+ ) USING paimon
+ PARTITIONED BY (dt, hr)
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO multi_partitioned_log_table VALUES
+ (1, 'alice', '2024-01-01', 10),
+ (2, 'bob', '2024-01-01', 10),
+ (3, 'carol', '2024-01-01', 20),
+ (4, 'dave', '2024-01-02', 10)
+ """
+ )
+
+ # ===== Partitioned table: PK + DV enabled =====
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS partitioned_dv_pk_table (
+ id INT,
+ name STRING,
+ dt STRING
+ ) USING paimon
+ PARTITIONED BY (dt)
+ TBLPROPERTIES (
+ 'primary-key' = 'id,dt',
+ 'bucket' = '1',
+ 'deletion-vectors.enabled' = 'true'
+ )
+ """
+ )
+
+ spark.sql(
+ """
+ INSERT INTO partitioned_dv_pk_table VALUES
+ (1, 'alice-v1', '2024-01-01'),
+ (2, 'bob-v1', '2024-01-01'),
+ (1, 'alice-v1', '2024-01-02'),
+ (3, 'carol-v1', '2024-01-02')
+ """
+ )
+
+ spark.sql(
+ """
+ INSERT INTO partitioned_dv_pk_table VALUES
+ (1, 'alice-v2', '2024-01-01'),
+ (3, 'carol-v2', '2024-01-02'),
+ (4, 'dave-v1', '2024-01-02')
+ """
+ )
+
+ spark.sql(
+ """
+ INSERT INTO partitioned_dv_pk_table VALUES
+ (2, 'bob-v2', '2024-01-01'),
+ (4, 'dave-v2', '2024-01-02')
+ """
+ )
+
if __name__ == "__main__":
main()