This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release/0.2.x in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
commit cbb73337bb1c40c0e77e0555c2f18b1c6b25966f Author: Shiyan Xu <[email protected]> AuthorDate: Tue Nov 19 21:05:52 2024 -1000 fix: improve api to get file slices splits (#185) --- crates/core/src/table/mod.rs | 36 ++++++++++++++++++++++++++++++++---- crates/datafusion/src/lib.rs | 2 +- python/hudi/__init__.py | 13 +++++++++---- python/hudi/_internal.pyi | 4 ++-- python/src/internal.rs | 4 ++-- python/tests/test_table_read.py | 2 +- 6 files changed, 47 insertions(+), 14 deletions(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 0d25db7..4d69bbf 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -66,7 +66,7 @@ //! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); //! let hudi_table = Table::new(base_uri.path()).await.unwrap(); //! let file_slices = hudi_table -//! .split_file_slices(2, &[]) +//! .get_file_slices_splits(2, &[]) //! .await.unwrap(); //! // define every parquet task reader how many slice //! let mut parquet_file_groups: Vec<Vec<String>> = Vec::new(); @@ -186,14 +186,22 @@ impl Table { Ok(Schema::new(partition_fields)) } - /// Split the file into a specified number of parts - pub async fn split_file_slices( + /// Get all the [FileSlice]s in the table. + /// + /// The file slices are split into `n` chunks. + /// + /// If the [AsOfTimestamp] configuration is set, the file slices at the specified timestamp will be returned. + pub async fn get_file_slices_splits( &self, n: usize, filters: &[(&str, &str, &str)], ) -> Result<Vec<Vec<FileSlice>>> { - let n = std::cmp::max(1, n); let file_slices = self.get_file_slices(filters).await?; + if file_slices.is_empty() { + return Ok(Vec::new()); + } + + let n = std::cmp::max(1, n); let chunk_size = (file_slices.len() + n - 1) / n; Ok(file_slices @@ -614,6 +622,26 @@ mod tests { assert_eq!(batches.num_columns(), 21); } + #[tokio::test] + async fn empty_hudi_table_get_file_slices_splits() { + let base_url = TestTable::V6Empty.url(); + + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let file_slices_splits = hudi_table.get_file_slices_splits(2, &[]).await.unwrap(); + assert!(file_slices_splits.is_empty()); + } + + #[tokio::test] + async fn hudi_table_get_file_slices_splits() { + let base_url = TestTable::V6SimplekeygenNonhivestyle.url(); + + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let file_slices_splits = hudi_table.get_file_slices_splits(2, &[]).await.unwrap(); + assert_eq!(file_slices_splits.len(), 2); + assert_eq!(file_slices_splits[0].len(), 2); + assert_eq!(file_slices_splits[1].len(), 1); + } + #[tokio::test] async fn hudi_table_get_file_slices_as_of_timestamps() { let base_url = TestTable::V6Nonpartitioned.url(); diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 6dbc624..739cc90 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -105,7 +105,7 @@ impl TableProvider for HudiDataSource { let file_slices = self .table // TODO: implement supports_filters_pushdown() to pass filters to Hudi table API - .split_file_slices(self.get_input_partitions(), &[]) + .get_file_slices_splits(self.get_input_partitions(), &[]) .await .map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?; let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new(); diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index fae5cab..ae6b9aa 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -16,8 +16,13 @@ # under the License. -from hudi._internal import HudiFileGroupReader as HudiFileGroupReader -from hudi._internal import HudiFileSlice as HudiFileSlice -from hudi._internal import HudiTable as HudiTable +from hudi._internal import HudiFileGroupReader, HudiFileSlice, HudiTable from hudi._internal import __version__ as __version__ -from hudi.table.builder import HudiTableBuilder as HudiTableBuilder +from hudi.table.builder import HudiTableBuilder + +__all__ = [ + "HudiFileGroupReader", + "HudiFileSlice", + "HudiTable", + "HudiTableBuilder", +] diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 081a229..48a9eef 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -141,11 +141,11 @@ class HudiTable: Dict[str, str]: A dictionary of storage options. """ ... - def split_file_slices( + def get_file_slices_splits( self, n: int, filters: Optional[List[Tuple[str, str, str]]] ) -> List[List[HudiFileSlice]]: """ - Splits the file slices into 'n' parts, optionally filtered by given filters. + Retrieves all file slices in the Hudi table in 'n' splits, optionally filtered by given filters. Parameters: n (int): The number of parts to split the file slices into. diff --git a/python/src/internal.rs b/python/src/internal.rs index 113c6c9..37201cd 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -157,7 +157,7 @@ impl HudiTable { } #[pyo3(signature = (n, filters=None))] - fn split_file_slices( + fn get_file_slices_splits( &self, n: usize, filters: Option<Vec<(String, String, String)>>, @@ -166,7 +166,7 @@ impl HudiTable { py.allow_threads(|| { let file_slices = rt().block_on( self.inner - .split_file_slices(n, vec_to_slice!(filters.unwrap_or_default())), + .get_file_slices_splits(n, vec_to_slice!(filters.unwrap_or_default())), )?; Ok(file_slices .iter() diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index fbed1fd..baebdff 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -80,7 +80,7 @@ def test_read_table_can_read_from_batches(get_sample_table): assert t.num_rows == 1 assert t.num_columns == 11 - file_slices_gen = iter(table.split_file_slices(2)) + file_slices_gen = iter(table.get_file_slices_splits(2)) assert len(next(file_slices_gen)) == 3 assert len(next(file_slices_gen)) == 2
