This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release/0.2.x
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git

commit cbb73337bb1c40c0e77e0555c2f18b1c6b25966f
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Nov 19 21:05:52 2024 -1000

    fix: improve api to get file slices splits (#185)
---
 crates/core/src/table/mod.rs    | 36 ++++++++++++++++++++++++++++++++----
 crates/datafusion/src/lib.rs    |  2 +-
 python/hudi/__init__.py         | 13 +++++++++----
 python/hudi/_internal.pyi       |  4 ++--
 python/src/internal.rs          |  4 ++--
 python/tests/test_table_read.py |  2 +-
 6 files changed, 47 insertions(+), 14 deletions(-)

diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0d25db7..4d69bbf 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -66,7 +66,7 @@
 //!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
 //!     let hudi_table = Table::new(base_uri.path()).await.unwrap();
 //!     let file_slices = hudi_table
-//!             .split_file_slices(2, &[])
+//!             .get_file_slices_splits(2, &[])
 //!             .await.unwrap();
 //!     // define every parquet task reader how many slice
 //!     let mut parquet_file_groups: Vec<Vec<String>> = Vec::new();
@@ -186,14 +186,22 @@ impl Table {
         Ok(Schema::new(partition_fields))
     }
 
-    /// Split the file into a specified number of parts
-    pub async fn split_file_slices(
+    /// Get all the [FileSlice]s in the table.
+    ///
+    /// The file slices are split into `n` chunks.
+    ///
+    /// If the [AsOfTimestamp] configuration is set, the file slices at the 
specified timestamp will be returned.
+    pub async fn get_file_slices_splits(
         &self,
         n: usize,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<Vec<FileSlice>>> {
-        let n = std::cmp::max(1, n);
         let file_slices = self.get_file_slices(filters).await?;
+        if file_slices.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        let n = std::cmp::max(1, n);
         let chunk_size = (file_slices.len() + n - 1) / n;
 
         Ok(file_slices
@@ -614,6 +622,26 @@ mod tests {
         assert_eq!(batches.num_columns(), 21);
     }
 
+    #[tokio::test]
+    async fn empty_hudi_table_get_file_slices_splits() {
+        let base_url = TestTable::V6Empty.url();
+
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices_splits = hudi_table.get_file_slices_splits(2, 
&[]).await.unwrap();
+        assert!(file_slices_splits.is_empty());
+    }
+
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_splits() {
+        let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices_splits = hudi_table.get_file_slices_splits(2, 
&[]).await.unwrap();
+        assert_eq!(file_slices_splits.len(), 2);
+        assert_eq!(file_slices_splits[0].len(), 2);
+        assert_eq!(file_slices_splits[1].len(), 1);
+    }
+
     #[tokio::test]
     async fn hudi_table_get_file_slices_as_of_timestamps() {
         let base_url = TestTable::V6Nonpartitioned.url();
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 6dbc624..739cc90 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -105,7 +105,7 @@ impl TableProvider for HudiDataSource {
         let file_slices = self
             .table
             // TODO: implement supports_filters_pushdown() to pass filters to 
Hudi table API
-            .split_file_slices(self.get_input_partitions(), &[])
+            .get_file_slices_splits(self.get_input_partitions(), &[])
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {}", e)))?;
         let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index fae5cab..ae6b9aa 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -16,8 +16,13 @@
 #  under the License.
 
 
-from hudi._internal import HudiFileGroupReader as HudiFileGroupReader
-from hudi._internal import HudiFileSlice as HudiFileSlice
-from hudi._internal import HudiTable as HudiTable
+from hudi._internal import HudiFileGroupReader, HudiFileSlice, HudiTable
 from hudi._internal import __version__ as __version__
-from hudi.table.builder import HudiTableBuilder as HudiTableBuilder
+from hudi.table.builder import HudiTableBuilder
+
+__all__ = [
+    "HudiFileGroupReader",
+    "HudiFileSlice",
+    "HudiTable",
+    "HudiTableBuilder",
+]
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 081a229..48a9eef 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -141,11 +141,11 @@ class HudiTable:
             Dict[str, str]: A dictionary of storage options.
         """
         ...
-    def split_file_slices(
+    def get_file_slices_splits(
         self, n: int, filters: Optional[List[Tuple[str, str, str]]]
     ) -> List[List[HudiFileSlice]]:
         """
-        Splits the file slices into 'n' parts, optionally filtered by given 
filters.
+        Retrieves all file slices in the Hudi table in 'n' splits, optionally 
filtered by given filters.
 
         Parameters:
             n (int): The number of parts to split the file slices into.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 113c6c9..37201cd 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -157,7 +157,7 @@ impl HudiTable {
     }
 
     #[pyo3(signature = (n, filters=None))]
-    fn split_file_slices(
+    fn get_file_slices_splits(
         &self,
         n: usize,
         filters: Option<Vec<(String, String, String)>>,
@@ -166,7 +166,7 @@ impl HudiTable {
         py.allow_threads(|| {
             let file_slices = rt().block_on(
                 self.inner
-                    .split_file_slices(n, 
vec_to_slice!(filters.unwrap_or_default())),
+                    .get_file_slices_splits(n, 
vec_to_slice!(filters.unwrap_or_default())),
             )?;
             Ok(file_slices
                 .iter()
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index fbed1fd..baebdff 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -80,7 +80,7 @@ def test_read_table_can_read_from_batches(get_sample_table):
     assert t.num_rows == 1
     assert t.num_columns == 11
 
-    file_slices_gen = iter(table.split_file_slices(2))
+    file_slices_gen = iter(table.get_file_slices_splits(2))
     assert len(next(file_slices_gen)) == 3
     assert len(next(file_slices_gen)) == 2
 

Reply via email to