This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fc1903  feat: Introduce split & plan (#95)
5fc1903 is described below

commit 5fc190300cff5fb631a613dc6b8746c4028c7af5
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Mar 2 21:36:58 2026 +0800

    feat: Introduce split & plan (#95)
---
 crates/paimon/src/error.rs                   |  10 ++
 crates/paimon/src/lib.rs                     |   1 +
 crates/paimon/src/{table.rs => table/mod.rs} |   4 +
 crates/paimon/src/table/source.rs            | 183 +++++++++++++++++++++++++++
 4 files changed, 198 insertions(+)

diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index d49d883..ca06419 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -34,6 +34,16 @@ pub enum Error {
         display("Paimon hitting unsupported error {}", message)
     )]
     Unsupported { message: String },
+    #[snafu(
+        whatever,
+        display("Paimon hitting unexpected error {}: {:?}", message, source)
+    )]
+    UnexpectedError {
+        message: String,
+        /// see https://github.com/shepmaster/snafu/issues/446
+        #[snafu(source(from(Box<dyn std::error::Error + Send + Sync + 
'static>, Some)))]
+        source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
+    },
     #[snafu(
         visibility(pub(crate)),
         display("Paimon data type invalid for {}", message)
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index 5a7fb66..c0353ce 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -24,3 +24,4 @@ pub mod file_index;
 pub mod io;
 pub mod spec;
 mod table;
+pub use table::{DataSplit, Plan, Table};
diff --git a/crates/paimon/src/table.rs b/crates/paimon/src/table/mod.rs
similarity index 97%
rename from crates/paimon/src/table.rs
rename to crates/paimon/src/table/mod.rs
index 718937b..16e5b75 100644
--- a/crates/paimon/src/table.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -17,6 +17,10 @@
 
 //! Table API for Apache Paimon
 
+mod source;
+
+pub use source::{DataSplit, Plan};
+
 use crate::catalog::Identifier;
 use crate::io::FileIO;
 use crate::spec::TableSchema;
diff --git a/crates/paimon/src/table/source.rs 
b/crates/paimon/src/table/source.rs
new file mode 100644
index 0000000..5e29446
--- /dev/null
+++ b/crates/paimon/src/table/source.rs
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table source types: DataSplit, Plan, and related structs.
+//!
+//! Reference: 
[org.apache.paimon.table.source](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/).
+
+#![allow(dead_code)]
+
+use crate::spec::{BinaryRow, DataFileMeta};
+
+// ======================= DataSplit ===============================
+
+/// Input split for reading: partition + bucket + list of data files.
+///
+/// Reference: 
[org.apache.paimon.table.source.DataSplit](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java)
+#[derive(Debug)]
+pub struct DataSplit {
+    snapshot_id: i64,
+    partition: BinaryRow,
+    bucket: i32,
+    bucket_path: String,
+    total_buckets: Option<i32>,
+    data_files: Vec<DataFileMeta>,
+}
+
+impl DataSplit {
+    pub fn snapshot_id(&self) -> i64 {
+        self.snapshot_id
+    }
+    pub fn partition(&self) -> &BinaryRow {
+        &self.partition
+    }
+    pub fn bucket(&self) -> i32 {
+        self.bucket
+    }
+    pub fn bucket_path(&self) -> &str {
+        &self.bucket_path
+    }
+    pub fn total_buckets(&self) -> Option<i32> {
+        self.total_buckets
+    }
+
+    pub fn data_files(&self) -> &[DataFileMeta] {
+        &self.data_files
+    }
+
+    /// Total row count of all data files in this split.
+    pub fn row_count(&self) -> i64 {
+        self.data_files.iter().map(|f| f.row_count).sum()
+    }
+
+    pub fn builder() -> DataSplitBuilder {
+        DataSplitBuilder::new()
+    }
+}
+
+/// Builder for [DataSplit].
+///
+/// Reference: 
[DataSplit.Builder](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java)
+#[derive(Debug)]
+pub struct DataSplitBuilder {
+    snapshot_id: i64,
+    partition: Option<BinaryRow>,
+    bucket: i32,
+    bucket_path: Option<String>,
+    total_buckets: Option<i32>,
+    data_files: Option<Vec<DataFileMeta>>,
+}
+
+impl DataSplitBuilder {
+    pub fn new() -> Self {
+        Self {
+            snapshot_id: -1,
+            partition: None,
+            bucket: -1,
+            bucket_path: None,
+            total_buckets: None,
+            data_files: None,
+        }
+    }
+
+    pub fn with_snapshot(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = snapshot_id;
+        self
+    }
+    pub fn with_partition(mut self, partition: BinaryRow) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+    pub fn with_bucket(mut self, bucket: i32) -> Self {
+        self.bucket = bucket;
+        self
+    }
+    pub fn with_bucket_path(mut self, bucket_path: String) -> Self {
+        self.bucket_path = Some(bucket_path);
+        self
+    }
+    pub fn with_total_buckets(mut self, total_buckets: Option<i32>) -> Self {
+        self.total_buckets = total_buckets;
+        self
+    }
+
+    pub fn build(self) -> crate::Result<DataSplit> {
+        if self.snapshot_id == -1 {
+            return Err(crate::Error::UnexpectedError {
+                message: "DataSplit requires snapshot_id != -1".to_string(),
+                source: None,
+            });
+        }
+        let partition = self
+            .partition
+            .ok_or_else(|| crate::Error::UnexpectedError {
+                message: "DataSplit requires partition".to_string(),
+                source: None,
+            })?;
+        let bucket_path = self
+            .bucket_path
+            .ok_or_else(|| crate::Error::UnexpectedError {
+                message: "DataSplit requires bucket_path".to_string(),
+                source: None,
+            })?;
+        let data_files = self
+            .data_files
+            .ok_or_else(|| crate::Error::UnexpectedError {
+                message: "DataSplit requires data_files".to_string(),
+                source: None,
+            })?;
+        if self.bucket == -1 {
+            return Err(crate::Error::UnexpectedError {
+                message: "DataSplit requires bucket != -1".to_string(),
+                source: None,
+            });
+        }
+        Ok(DataSplit {
+            snapshot_id: self.snapshot_id,
+            partition,
+            bucket: self.bucket,
+            bucket_path,
+            total_buckets: self.total_buckets,
+            data_files,
+        })
+    }
+}
+
+impl Default for DataSplitBuilder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+// ======================= Plan ===============================
+
+/// Read plan: list of splits.
+///
+/// Reference: 
[org.apache.paimon.table.source.PlanImpl](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java)
+#[derive(Debug)]
+pub struct Plan {
+    splits: Vec<DataSplit>,
+}
+
+impl Plan {
+    pub fn new(splits: Vec<DataSplit>) -> Self {
+        Self { splits }
+    }
+    pub fn splits(&self) -> &[DataSplit] {
+        &self.splits
+    }
+}

Reply via email to