This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 6c98b09 feat: add blocking APIs for `Table` and `FileGroupReader`
(#321)
6c98b09 is described below
commit 6c98b0931ef4f06278fa2c69af94f4616f5e3518
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Apr 27 00:57:47 2025 -0500
feat: add blocking APIs for `Table` and `FileGroupReader` (#321)
Add blocking version of `Table` and `FileGroupReader` APIs in Rust, with
some minor refactoring on API args.
---
crates/core/src/config/util.rs | 5 +
crates/core/src/expr/filter.rs | 11 +-
crates/core/src/file_group/mod.rs | 25 +-
crates/core/src/file_group/reader.rs | 27 +-
crates/core/src/table/mod.rs | 513 +++++++++++++++++++++++------------
crates/core/src/util/mod.rs | 39 ---
crates/datafusion/src/lib.rs | 3 +-
demo/table-api-rust/src/main.rs | 3 +-
python/src/internal.rs | 56 ++--
9 files changed, 423 insertions(+), 259 deletions(-)
diff --git a/crates/core/src/config/util.rs b/crates/core/src/config/util.rs
index 33921c6..44879b5 100644
--- a/crates/core/src/config/util.rs
+++ b/crates/core/src/config/util.rs
@@ -29,6 +29,11 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a
str)> {
std::iter::empty::<(&str, &str)>()
}
+/// Returns an empty iterator to represent an empty set of filters.
+pub fn empty_filters<'a>() -> std::iter::Empty<(&'a str, &'a str, &'a str)> {
+ std::iter::empty::<(&str, &str, &str)>()
+}
+
/// Splits the given options into two maps: one for Hudi options, and the
other for others, which could be storage options for example.
pub fn split_hudi_options_from_others<I, K, V>(
all_options: I,
diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 847a2f5..b9cb7a6 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -72,8 +72,15 @@ impl TryFrom<(&str, &str, &str)> for Filter {
}
}
-pub fn from_str_tuples(tuples: &[(&str, &str, &str)]) -> Result<Vec<Filter>> {
- tuples.iter().map(|t| Filter::try_from(*t)).collect()
+pub fn from_str_tuples<I, S>(tuples: I) -> Result<Vec<Filter>>
+where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+{
+ tuples
+ .into_iter()
+ .map(|t| Filter::try_from((t.0.as_ref(), t.1.as_ref(), t.2.as_ref())))
+ .collect()
}
pub struct FilterField {
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index cff9a78..cf5a4ae 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -87,13 +87,11 @@ impl FileGroup {
}
/// Create a new [FileGroup] with a [BaseFile]'s file name.
- pub fn new_with_base_file_name(
- id: String,
- partition_path: String,
- file_name: &str,
- ) -> Result<Self> {
- let mut file_group = Self::new(id, partition_path);
- file_group.add_base_file_from_name(file_name)?;
+ pub fn new_with_base_file_name(file_name: &str, partition_path: &str) ->
Result<Self> {
+ let base_file = BaseFile::from_str(file_name)?;
+ let file_id = base_file.file_id.clone();
+ let mut file_group = Self::new(file_id, partition_path.to_string());
+ file_group.add_base_file(base_file)?;
Ok(file_group)
}
@@ -156,6 +154,19 @@ impl FileGroup {
self.add_log_file(log_file)
}
+ /// Add multiple [LogFile]s based on the file names to the corresponding
[FileSlice]s in the
+ /// [FileGroup].
+ pub fn add_log_files_from_names<I, S>(&mut self, log_file_names: I) ->
Result<&Self>
+ where
+ I: IntoIterator<Item = S>,
+ S: AsRef<str>,
+ {
+ for file_name in log_file_names {
+ self.add_log_file_from_name(file_name.as_ref())?;
+ }
+ Ok(self)
+ }
+
/// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
///
/// TODO: support adding log files to file group without base files.
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 69f9cf4..5d79ce8 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -168,6 +168,17 @@ impl FileGroupReader {
}
}
+ /// Same as [FileGroupReader::read_file_slice_by_base_file_path], but
blocking.
+ pub fn read_file_slice_by_base_file_path_blocking(
+ &self,
+ relative_path: &str,
+ ) -> Result<RecordBatch> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(self.read_file_slice_by_base_file_path(relative_path))
+ }
+
fn create_instant_range_for_log_file_scan(&self) -> InstantRange {
let timezone = self
.hudi_configs
@@ -225,6 +236,14 @@ impl FileGroupReader {
merger.merge_record_batches(&schema, &all_record_batches)
}
}
+
+ /// Same as [FileGroupReader::read_file_slice], but blocking.
+ pub fn read_file_slice_blocking(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(self.read_file_slice(file_slice))
+ }
}
#[cfg(test)]
@@ -249,14 +268,12 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_read_file_slice_returns_error() {
+ #[test]
+ fn test_read_file_slice_returns_error() {
let reader =
FileGroupReader::new_with_options("file:///non-existent-path/table",
empty_options())
.unwrap();
- let result = reader
- .read_file_slice_by_base_file_path("non_existent_file")
- .await;
+ let result =
reader.read_file_slice_by_base_file_path_blocking("non_existent_file");
assert!(matches!(result.unwrap_err(), ReadFileSliceError(_)));
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e47ff9b..acb5286 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -47,18 +47,20 @@
//! 3. read hudi table
//! ```rust
//! use url::Url;
+//! use hudi_core::config::util::empty_filters;
//! use hudi_core::table::Table;
//!
//! pub async fn test() {
//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
-//! let record_read = hudi_table.read_snapshot(&[]).await.unwrap();
+//! let record_read =
hudi_table.read_snapshot(empty_filters()).await.unwrap();
//! }
//! ```
//! 4. get file slice
//! Users can obtain metadata to customize reading methods, read in
batches, perform parallel reads, and more.
//! ```rust
//! use url::Url;
+//! use hudi_core::config::util::empty_filters;
//! use hudi_core::table::Table;
//! use hudi_core::storage::util::parse_uri;
//! use hudi_core::storage::util::join_url_segments;
@@ -67,7 +69,7 @@
//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
//! let file_slices = hudi_table
-//! .get_file_slices_splits(2, &[])
+//! .get_file_slices_splits(2, empty_filters())
//! .await.unwrap();
//! // define every parquet task reader how many slice
//! let mut parquet_file_groups: Vec<Vec<String>> = Vec::new();
@@ -124,6 +126,14 @@ impl Table {
TableBuilder::from_base_uri(base_uri).build().await
}
+ /// Same as [Table::new], but blocking.
+ pub fn new_blocking(base_uri: &str) -> Result<Self> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { Table::new(base_uri).await })
+ }
+
/// Create hudi table with options
pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
where
@@ -137,6 +147,19 @@ impl Table {
.await
}
+ /// Same as [Table::new_with_options], but blocking.
+ pub fn new_with_options_blocking<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { Table::new_with_options(base_uri, options).await
})
+ }
+
pub fn hudi_options(&self) -> HashMap<String, String> {
self.hudi_configs.as_options()
}
@@ -198,11 +221,27 @@ impl Table {
self.timeline.get_latest_avro_schema().await
}
+ /// Same as [Table::get_avro_schema], but blocking.
+ pub fn get_avro_schema_blocking(&self) -> Result<String> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.get_avro_schema().await })
+ }
+
/// Get the latest [arrow_schema::Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
}
+ /// Same as [Table::get_schema], but blocking.
+ pub fn get_schema_blocking(&self) -> Result<Schema> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.get_schema().await })
+ }
+
/// Get the latest partition [arrow_schema::Schema] of the table.
pub async fn get_partition_schema(&self) -> Result<Schema> {
let partition_fields: HashSet<String> = self
@@ -223,6 +262,14 @@ impl Table {
Ok(Schema::new(partition_fields))
}
+ /// Same as [Table::get_partition_schema], but blocking.
+ pub fn get_partition_schema_blocking(&self) -> Result<Schema> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.get_partition_schema().await })
+ }
+
/// Get the [Timeline] of the table.
pub fn get_timeline(&self) -> &Timeline {
&self.timeline
@@ -233,11 +280,15 @@ impl Table {
/// # Arguments
/// * `n` - The number of chunks to split the file slices into.
/// * `filters` - Partition filters to apply.
- pub async fn get_file_slices_splits(
+ pub async fn get_file_slices_splits<I, S>(
&self,
n: usize,
- filters: &[(&str, &str, &str)],
- ) -> Result<Vec<Vec<FileSlice>>> {
+ filters: I,
+ ) -> Result<Vec<Vec<FileSlice>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
let filters = from_str_tuples(filters)?;
self.get_file_slices_splits_internal(n, timestamp, &filters)
@@ -247,23 +298,63 @@ impl Table {
}
}
+ /// Same as [Table::get_file_slices_splits], but blocking.
+ pub fn get_file_slices_splits_blocking<I, S>(
+ &self,
+ n: usize,
+ filters: I,
+ ) -> Result<Vec<Vec<FileSlice>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.get_file_slices_splits(n, filters).await })
+ }
+
/// Get all the [FileSlice]s in splits from the table at a given timestamp.
///
/// # Arguments
/// * `n` - The number of chunks to split the file slices into.
/// * `timestamp` - The timestamp which file slices associated with.
/// * `filters` - Partition filters to apply.
- pub async fn get_file_slices_splits_as_of(
+ pub async fn get_file_slices_splits_as_of<I, S>(
&self,
n: usize,
timestamp: &str,
- filters: &[(&str, &str, &str)],
- ) -> Result<Vec<Vec<FileSlice>>> {
+ filters: I,
+ ) -> Result<Vec<Vec<FileSlice>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
let filters = from_str_tuples(filters)?;
self.get_file_slices_splits_internal(n, timestamp, &filters)
.await
}
+ /// Same as [Table::get_file_slices_splits_as_of], but blocking.
+ pub fn get_file_slices_splits_as_of_blocking<I, S>(
+ &self,
+ n: usize,
+ timestamp: &str,
+ filters: I,
+ ) -> Result<Vec<Vec<FileSlice>>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async {
+ self.get_file_slices_splits_as_of(n, timestamp, filters)
+ .await
+ })
+ }
+
async fn get_file_slices_splits_internal(
&self,
n: usize,
@@ -291,7 +382,11 @@ impl Table {
///
/// # Notes
/// * This API is useful for implementing snapshot query.
- pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<FileSlice>> {
+ pub async fn get_file_slices<I, S>(&self, filters: I) ->
Result<Vec<FileSlice>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
let filters = from_str_tuples(filters)?;
self.get_file_slices_internal(timestamp, &filters).await
@@ -300,6 +395,18 @@ impl Table {
}
}
+ /// Same as [Table::get_file_slices], but blocking.
+ pub fn get_file_slices_blocking<I, S>(&self, filters: I) ->
Result<Vec<FileSlice>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.get_file_slices(filters).await })
+ }
+
/// Get all the [FileSlice]s in the table at a given timestamp.
///
/// # Arguments
@@ -308,15 +415,35 @@ impl Table {
///
/// # Notes
/// * This API is useful for implementing time travel query.
- pub async fn get_file_slices_as_of(
+ pub async fn get_file_slices_as_of<I, S>(
&self,
timestamp: &str,
- filters: &[(&str, &str, &str)],
- ) -> Result<Vec<FileSlice>> {
+ filters: I,
+ ) -> Result<Vec<FileSlice>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
let filters = from_str_tuples(filters)?;
self.get_file_slices_internal(timestamp, &filters).await
}
+ /// Same as [Table::get_file_slices_as_of], but blocking.
+ pub fn get_file_slices_as_of_blocking<I, S>(
+ &self,
+ timestamp: &str,
+ filters: I,
+ ) -> Result<Vec<FileSlice>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.get_file_slices_as_of(timestamp,
filters).await })
+ }
+
async fn get_file_slices_internal(
&self,
timestamp: &str,
@@ -360,6 +487,21 @@ impl Table {
self.get_file_slices_between_internal(start, end).await
}
+ /// Same as [Table::get_file_slices_between], but blocking.
+ pub fn get_file_slices_between_blocking(
+ &self,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<FileSlice>> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async {
+ self.get_file_slices_between(start_timestamp, end_timestamp)
+ .await
+ })
+ }
+
async fn get_file_slices_between_internal(
&self,
start_timestamp: &str,
@@ -406,7 +548,11 @@ impl Table {
///
/// # Arguments
/// * `filters` - Partition filters to apply.
- pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<RecordBatch>> {
+ pub async fn read_snapshot<I, S>(&self, filters: I) ->
Result<Vec<RecordBatch>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
let filters = from_str_tuples(filters)?;
self.read_snapshot_internal(timestamp, &filters).await
@@ -415,20 +561,52 @@ impl Table {
}
}
+ /// Same as [Table::read_snapshot], but blocking.
+ pub fn read_snapshot_blocking<I, S>(&self, filters: I) ->
Result<Vec<RecordBatch>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.read_snapshot(filters).await })
+ }
+
/// Get all the records in the table at a given timestamp.
///
/// # Arguments
/// * `timestamp` - The timestamp which records associated with.
/// * `filters` - Partition filters to apply.
- pub async fn read_snapshot_as_of(
+ pub async fn read_snapshot_as_of<I, S>(
&self,
timestamp: &str,
- filters: &[(&str, &str, &str)],
- ) -> Result<Vec<RecordBatch>> {
+ filters: I,
+ ) -> Result<Vec<RecordBatch>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
let filters = from_str_tuples(filters)?;
self.read_snapshot_internal(timestamp, &filters).await
}
+ /// Same as [Table::read_snapshot_as_of], but blocking.
+ pub fn read_snapshot_as_of_blocking<I, S>(
+ &self,
+ timestamp: &str,
+ filters: I,
+ ) -> Result<Vec<RecordBatch>>
+ where
+ I: IntoIterator<Item = (S, S, S)>,
+ S: AsRef<str>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.read_snapshot_as_of(timestamp,
filters).await })
+ }
+
async fn read_snapshot_internal(
&self,
timestamp: &str,
@@ -480,6 +658,21 @@ impl Table {
Ok(batches)
}
+ /// Same as [Table::read_incremental_records], but blocking.
+ pub fn read_incremental_records_blocking(
+ &self,
+ start_timestamp: &str,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<RecordBatch>> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async {
+ self.read_incremental_records(start_timestamp, end_timestamp)
+ .await
+ })
+ }
+
/// Get the change-data-capture (CDC) records between the given timestamps.
///
/// The CDC records should reflect the records that were inserted,
updated, and deleted
@@ -503,7 +696,7 @@ mod tests {
PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
TimelineLayoutVersion, TimelineTimezone,
};
- use crate::config::util::empty_options;
+ use crate::config::util::{empty_filters, empty_options};
use crate::config::HUDI_CONF_DIR;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
@@ -518,27 +711,26 @@ mod tests {
/// # Arguments
///
/// * `table_dir_name` - Name of the table root directory; all under
`crates/core/tests/data/`.
- async fn get_test_table_without_validation(table_dir_name: &str) -> Table {
+ fn get_test_table_without_validation(table_dir_name: &str) -> Table {
let base_url = Url::from_file_path(
canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(),
)
.unwrap();
- Table::new_with_options(
+ Table::new_with_options_blocking(
base_url.as_str(),
[("hoodie.internal.skip.config.validation", "true")],
)
- .await
.unwrap()
}
/// Test helper to get relative file paths from the table with filters.
- async fn get_file_paths_with_filters(
+ fn get_file_paths_with_filters(
table: &Table,
filters: &[(&str, &str, &str)],
) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
let base_url = table.base_url();
- for f in table.get_file_slices(filters).await? {
+ for f in table.get_file_slices_blocking(filters.to_vec())? {
let relative_path = f.base_file_relative_path()?;
let file_url = join_url_segments(&base_url,
&[relative_path.as_str()])?;
file_paths.push(file_url.to_string());
@@ -546,10 +738,10 @@ mod tests {
Ok(file_paths)
}
- #[tokio::test]
- async fn test_hudi_table_get_hudi_options() {
+ #[test]
+ fn test_hudi_table_get_hudi_options() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let hudi_options = hudi_table.hudi_options();
for (k, v) in hudi_options.iter() {
assert!(k.starts_with("hoodie."));
@@ -557,10 +749,10 @@ mod tests {
}
}
- #[tokio::test]
- async fn test_hudi_table_get_storage_options() {
+ #[test]
+ fn test_hudi_table_get_storage_options() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES
.iter()
@@ -584,13 +776,12 @@ mod tests {
}
}
- #[tokio::test]
- async fn hudi_table_get_schema() {
+ #[test]
+ fn hudi_table_get_schema() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let fields: Vec<String> = hudi_table
- .get_schema()
- .await
+ .get_schema_blocking()
.unwrap()
.flattened_fields()
.into_iter()
@@ -637,13 +828,12 @@ mod tests {
);
}
- #[tokio::test]
- async fn hudi_table_get_partition_schema() {
+ #[test]
+ fn hudi_table_get_partition_schema() {
let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let fields: Vec<String> = hudi_table
- .get_partition_schema()
- .await
+ .get_partition_schema_blocking()
.unwrap()
.flattened_fields()
.into_iter()
@@ -652,9 +842,9 @@ mod tests {
assert_eq!(fields, vec!["ts_str"]);
}
- #[tokio::test]
- async fn validate_invalid_table_props() {
- let table =
get_test_table_without_validation("table_props_invalid").await;
+ #[test]
+ fn validate_invalid_table_props() {
+ let table = get_test_table_without_validation("table_props_invalid");
let configs = table.hudi_configs;
assert!(
configs.validate(BaseFileFormat).is_err(),
@@ -704,9 +894,9 @@ mod tests {
);
}
- #[tokio::test]
- async fn get_invalid_table_props() {
- let table =
get_test_table_without_validation("table_props_invalid").await;
+ #[test]
+ fn get_invalid_table_props() {
+ let table = get_test_table_without_validation("table_props_invalid");
let configs = table.hudi_configs;
assert!(configs.get(BaseFileFormat).is_err());
assert!(configs.get(Checksum).is_err());
@@ -726,9 +916,9 @@ mod tests {
assert!(configs.get(TimelineTimezone).is_err());
}
- #[tokio::test]
- async fn get_default_for_invalid_table_props() {
- let table =
get_test_table_without_validation("table_props_invalid").await;
+ #[test]
+ fn get_default_for_invalid_table_props() {
+ let table = get_test_table_without_validation("table_props_invalid");
let configs = table.hudi_configs;
assert_eq!(
configs.get_or_default(BaseFileFormat).to::<String>(),
@@ -763,9 +953,9 @@ mod tests {
);
}
- #[tokio::test]
- async fn get_valid_table_props() {
- let table =
get_test_table_without_validation("table_props_valid").await;
+ #[test]
+ fn get_valid_table_props() {
+ let table = get_test_table_without_validation("table_props_valid");
let configs = table.hudi_configs;
assert_eq!(
configs.get(BaseFileFormat).unwrap().to::<String>(),
@@ -803,10 +993,10 @@ mod tests {
);
}
- #[tokio::test]
- async fn get_global_table_props() {
+ #[test]
+ fn get_global_table_props() {
// Without the environment variable HUDI_CONF_DIR
- let table =
get_test_table_without_validation("table_props_partial").await;
+ let table = get_test_table_without_validation("table_props_partial");
let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
@@ -816,7 +1006,7 @@ mod tests {
let base_path = env::current_dir().unwrap();
let hudi_conf_dir = base_path.join("random/wrong/dir");
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
- let table =
get_test_table_without_validation("table_props_partial").await;
+ let table = get_test_table_without_validation("table_props_partial");
let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
@@ -826,7 +1016,7 @@ mod tests {
let base_path = env::current_dir().unwrap();
let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir");
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
- let table =
get_test_table_without_validation("table_props_partial").await;
+ let table = get_test_table_without_validation("table_props_partial");
let configs = table.hudi_configs;
assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
assert_eq!(
@@ -837,52 +1027,54 @@ mod tests {
env::remove_var(HUDI_CONF_DIR)
}
- #[tokio::test]
- async fn hudi_table_read_file_slice() {
+ #[test]
+ fn hudi_table_read_file_slice() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let batches = hudi_table
.create_file_group_reader_with_options(empty_options())
.unwrap()
- .read_file_slice_by_base_file_path(
+ .read_file_slice_by_base_file_path_blocking(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
)
- .await
.unwrap();
assert_eq!(batches.num_rows(), 4);
assert_eq!(batches.num_columns(), 21);
}
- #[tokio::test]
- async fn empty_hudi_table_get_file_slices_splits() {
+ #[test]
+ fn empty_hudi_table_get_file_slices_splits() {
let base_url = SampleTable::V6Empty.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
- let file_slices_splits = hudi_table.get_file_slices_splits(2,
&[]).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_blocking(2, empty_filters())
+ .unwrap();
assert!(file_slices_splits.is_empty());
}
- #[tokio::test]
- async fn hudi_table_get_file_slices_splits() {
+ #[test]
+ fn hudi_table_get_file_slices_splits() {
let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
- let file_slices_splits = hudi_table.get_file_slices_splits(2,
&[]).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_blocking(2, empty_filters())
+ .unwrap();
assert_eq!(file_slices_splits.len(), 2);
assert_eq!(file_slices_splits[0].len(), 2);
assert_eq!(file_slices_splits[1].len(), 1);
}
- #[tokio::test]
- async fn hudi_table_get_file_slices_splits_as_of_timestamps() {
+ #[test]
+ fn hudi_table_get_file_slices_splits_as_of_timestamps() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
// before replacecommit (insert overwrite table)
let second_latest_timestamp = "20250121000656060";
let file_slices_splits = hudi_table
- .get_file_slices_splits_as_of(2, second_latest_timestamp, &[])
- .await
+ .get_file_slices_splits_as_of_blocking(2, second_latest_timestamp,
empty_filters())
.unwrap();
assert_eq!(file_slices_splits.len(), 2);
assert_eq!(file_slices_splits[0].len(), 2);
@@ -915,19 +1107,20 @@ mod tests {
// as of replacecommit (insert overwrite table)
let latest_timestamp = "20250121000702475";
let file_slices_splits = hudi_table
- .get_file_slices_splits_as_of(2, latest_timestamp, &[])
- .await
+ .get_file_slices_splits_as_of_blocking(2, latest_timestamp,
empty_filters())
.unwrap();
assert_eq!(file_slices_splits.len(), 1);
assert_eq!(file_slices_splits[0].len(), 1);
}
- #[tokio::test]
- async fn hudi_table_get_file_slices_as_of_timestamps() {
+ #[test]
+ fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
- let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let file_slices = hudi_table
+ .get_file_slices_blocking(empty_filters())
+ .unwrap();
assert_eq!(
file_slices
.iter()
@@ -937,10 +1130,9 @@ mod tests {
);
// as of the latest timestamp
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let file_slices = hudi_table
- .get_file_slices_as_of("20240418173551906", &[])
- .await
+ .get_file_slices_as_of_blocking("20240418173551906",
empty_filters())
.unwrap();
assert_eq!(
file_slices
@@ -951,12 +1143,10 @@ mod tests {
);
// as of just smaller than the latest timestamp
- let hudi_table = Table::new_with_options(base_url.path(),
empty_options())
- .await
- .unwrap();
+ let hudi_table =
+ Table::new_with_options_blocking(base_url.path(),
empty_options()).unwrap();
let file_slices = hudi_table
- .get_file_slices_as_of("20240418173551905", &[])
- .await
+ .get_file_slices_as_of_blocking("20240418173551905",
empty_filters())
.unwrap();
assert_eq!(
file_slices
@@ -967,12 +1157,10 @@ mod tests {
);
// as of non-exist old timestamp
- let hudi_table = Table::new_with_options(base_url.path(),
empty_options())
- .await
- .unwrap();
+ let hudi_table =
+ Table::new_with_options_blocking(base_url.path(),
empty_options()).unwrap();
let file_slices = hudi_table
- .get_file_slices_as_of("19700101000000", &[])
- .await
+ .get_file_slices_as_of_blocking("19700101000000", empty_filters())
.unwrap();
assert_eq!(
file_slices
@@ -983,24 +1171,22 @@ mod tests {
);
}
- #[tokio::test]
- async fn empty_hudi_table_get_file_slices_between_timestamps() {
+ #[test]
+ fn empty_hudi_table_get_file_slices_between_timestamps() {
let base_url = SampleTable::V6Empty.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let file_slices = hudi_table
- .get_file_slices_between(Some(EARLIEST_START_TIMESTAMP), None)
- .await
+ .get_file_slices_between_blocking(Some(EARLIEST_START_TIMESTAMP),
None)
.unwrap();
assert!(file_slices.is_empty())
}
- #[tokio::test]
- async fn hudi_table_get_file_slices_between_timestamps() {
+ #[test]
+ fn hudi_table_get_file_slices_between_timestamps() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
let mut file_slices = hudi_table
- .get_file_slices_between(None, Some("20250121000656060"))
- .await
+ .get_file_slices_between_blocking(None, Some("20250121000656060"))
.unwrap();
assert_eq!(file_slices.len(), 3);
@@ -1031,15 +1217,14 @@ mod tests {
assert!(file_slice_2.log_files.is_empty());
}
- #[tokio::test]
- async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
+ #[test]
+ fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
let partition_filters = &[];
let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
- .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1055,7 +1240,6 @@ mod tests {
let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
- .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1069,7 +1253,6 @@ mod tests {
assert_eq!(actual, expected);
let actual = get_file_paths_with_filters(&hudi_table, &[("byteField",
">", "30")])
- .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1077,15 +1260,14 @@ mod tests {
assert_eq!(actual, expected);
}
- #[tokio::test]
- async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
+ #[test]
+ fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
let partition_filters = &[];
let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
- .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1105,7 +1287,6 @@ mod tests {
("shortField", "!=", "100"),
];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
- .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1119,7 +1300,6 @@ mod tests {
let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
- .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1129,28 +1309,29 @@ mod tests {
mod test_snapshot_and_time_travel_queries {
use super::super::*;
+ use crate::config::util::empty_filters;
use arrow::compute::concat_batches;
use hudi_test::{QuickstartTripsTable, SampleTable};
- #[tokio::test]
- async fn test_empty() -> Result<()> {
+ #[test]
+ fn test_empty() -> Result<()> {
for base_url in SampleTable::V6Empty.urls() {
- let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_snapshot(&[]).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
+ let records =
hudi_table.read_snapshot_blocking(empty_filters())?;
assert!(records.is_empty());
}
Ok(())
}
- #[tokio::test]
- async fn test_quickstart_trips_table() -> Result<()> {
+ #[test]
+ fn test_quickstart_trips_table() -> Result<()> {
let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
- let hudi_table = Table::new(base_url.path()).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
let updated_rider = "rider-D";
// verify updated record as of the latest commit
- let records = hudi_table.read_snapshot(&[]).await?;
+ let records = hudi_table.read_snapshot_blocking(empty_filters())?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -1172,7 +1353,7 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records = hudi_table.read_snapshot_as_of(first_commit,
&[]).await?;
+ let records =
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -1189,11 +1370,11 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_non_partitioned() -> Result<()> {
+ #[test]
+ fn test_non_partitioned() -> Result<()> {
for base_url in SampleTable::V6Nonpartitioned.urls() {
- let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_snapshot(&[]).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
+ let records =
hudi_table.read_snapshot_blocking(empty_filters())?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -1211,14 +1392,13 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_non_partitioned_read_optimized() -> Result<()> {
+ #[test]
+ fn test_non_partitioned_read_optimized() -> Result<()> {
let base_url = SampleTable::V6Nonpartitioned.url_to_mor_parquet();
- let hudi_table = Table::new_with_options(
+ let hudi_table = Table::new_with_options_blocking(
base_url.path(),
[(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
- )
- .await?;
+ )?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
@@ -1226,7 +1406,8 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let latest_commit = commit_timestamps.last().unwrap();
- let records = hudi_table.read_snapshot_as_of(latest_commit,
&[]).await?;
+ let records =
+ hudi_table.read_snapshot_as_of_blocking(latest_commit,
empty_filters())?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -1243,11 +1424,11 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_non_partitioned_rollback() -> Result<()> {
+ #[test]
+ fn test_non_partitioned_rollback() -> Result<()> {
let base_url =
SampleTable::V6NonpartitionedRollback.url_to_mor_parquet();
- let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_snapshot(&[]).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
+ let records = hudi_table.read_snapshot_blocking(empty_filters())?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -1263,17 +1444,17 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
+ #[test]
+ fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
- let hudi_table = Table::new(base_url.path()).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
- let filters = &[
+ let filters = vec![
("byteField", ">=", "10"),
("byteField", "<", "20"),
("shortField", "!=", "100"),
];
- let records = hudi_table.read_snapshot(filters).await?;
+ let records = hudi_table.read_snapshot_blocking(filters)?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -1283,10 +1464,10 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
+ #[test]
+ fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() {
- let hudi_table = Table::new(base_url.path()).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
@@ -1294,7 +1475,8 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records = hudi_table.read_snapshot_as_of(first_commit,
&[]).await?;
+ let records =
+ hudi_table.read_snapshot_as_of_blocking(first_commit,
empty_filters())?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -1307,11 +1489,11 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
+ #[test]
+ fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
for base_url in
SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() {
- let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_snapshot(&[]).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
+ let records =
hudi_table.read_snapshot_blocking(empty_filters())?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -1335,20 +1517,20 @@ mod tests {
use arrow_select::concat::concat_batches;
use hudi_test::SampleTable;
- #[tokio::test]
- async fn test_empty() -> Result<()> {
+ #[test]
+ fn test_empty() -> Result<()> {
for base_url in SampleTable::V6Empty.urls() {
- let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_incremental_records("0",
None).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
+ let records =
hudi_table.read_incremental_records_blocking("0", None)?;
assert!(records.is_empty())
}
Ok(())
}
- #[tokio::test]
- async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()>
{
+ #[test]
+ fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
for base_url in
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
- let hudi_table = Table::new(base_url.path()).await?;
+ let hudi_table = Table::new_blocking(base_url.path())?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
@@ -1362,8 +1544,7 @@ mod tests {
// read records changed from the beginning to the 1st commit
let records = hudi_table
- .read_incremental_records("19700101000000",
Some(first_commit))
- .await?;
+ .read_incremental_records_blocking("19700101000000",
Some(first_commit))?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -1375,8 +1556,7 @@ mod tests {
// read records changed from the 1st to the 2nd commit
let records = hudi_table
- .read_incremental_records(first_commit,
Some(second_commit))
- .await?;
+ .read_incremental_records_blocking(first_commit,
Some(second_commit))?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -1388,8 +1568,7 @@ mod tests {
// read records changed from the 2nd to the 3rd commit
let records = hudi_table
- .read_incremental_records(second_commit,
Some(third_commit))
- .await?;
+ .read_incremental_records_blocking(second_commit,
Some(third_commit))?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -1400,9 +1579,7 @@ mod tests {
);
// read records changed from the 1st commit
- let records = hudi_table
- .read_incremental_records(first_commit, None)
- .await?;
+ let records =
hudi_table.read_incremental_records_blocking(first_commit, None)?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -1413,9 +1590,7 @@ mod tests {
);
// read records changed from the 3rd commit
- let records = hudi_table
- .read_incremental_records(third_commit, None)
- .await?;
+ let records =
hudi_table.read_incremental_records_blocking(third_commit, None)?;
assert!(
records.is_empty(),
"Should return 0 record as it's the latest commit"
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index 21306c9..68bc85e 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -17,42 +17,3 @@
* under the License.
*/
pub mod arrow;
-
-pub trait StrTupleRef {
- fn as_strs(&self) -> Vec<(&str, &str, &str)>;
-}
-
-impl StrTupleRef for Vec<(String, String, String)> {
- fn as_strs(&self) -> Vec<(&str, &str, &str)> {
- self.iter()
- .map(|(s1, s2, s3)| (s1.as_str(), s2.as_str(), s3.as_str()))
- .collect()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_convert_vec_of_string_to_vec_of_str_slice() {
- let vec_of_strings = vec![
- (
- String::from("date"),
- String::from("="),
- String::from("2022-01-02"),
- ),
- (
- String::from("foo"),
- String::from("bar"),
- String::from("baz"),
- ),
- ];
- let binding = vec_of_strings.as_strs();
- let str_slice = &binding[..];
- assert_eq!(
- str_slice,
- [("date", "=", "2022-01-02"), ("foo", "bar", "baz")]
- );
- }
-}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 9beb612..f3a0601 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -47,7 +47,6 @@ use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_core::config::util::empty_options;
use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
use hudi_core::table::Table as HudiTable;
-use hudi_core::util::StrTupleRef;
/// Create a `HudiDataSource`.
/// Used for Datafusion to query Hudi tables
@@ -179,7 +178,7 @@ impl TableProvider for HudiDataSource {
let pushdown_filters = exprs_to_filters(filters);
let file_slices = self
.table
- .get_file_slices_splits(self.get_input_partitions(),
&pushdown_filters.as_strs())
+ .get_file_slices_splits(self.get_input_partitions(),
pushdown_filters)
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
let base_url = self.table.base_url();
diff --git a/demo/table-api-rust/src/main.rs b/demo/table-api-rust/src/main.rs
index 5a5c40c..056c247 100644
--- a/demo/table-api-rust/src/main.rs
+++ b/demo/table-api-rust/src/main.rs
@@ -18,6 +18,7 @@
*/
use arrow::compute::concat_batches;
+use hudi::config::util::empty_filters;
use hudi::error::Result;
use hudi::table::builder::TableBuilder as HudiTableBuilder;
@@ -28,7 +29,7 @@ async fn main() -> Result<()> {
"s3://hudi-demo/mor/parquet/v6_complexkeygen_hivestyle",
] {
let hudi_table = HudiTableBuilder::from_base_uri(url).build().await?;
- let batches = hudi_table.read_snapshot(&[]).await?;
+ let batches = hudi_table.read_snapshot(empty_filters()).await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
assert_eq!(
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 86c50e0..aac3658 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -33,7 +33,6 @@ use hudi::table::builder::TableBuilder;
use hudi::table::Table;
use hudi::timeline::instant::Instant;
use hudi::timeline::Timeline;
-use hudi::util::StrTupleRef;
use pyo3::exceptions::PyException;
use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject,
PyResult, Python};
@@ -86,19 +85,16 @@ impl HudiFileGroupReader {
.to_pyarrow(py)
}
fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) ->
PyResult<PyObject> {
- let mut file_group = FileGroup::new(
- file_slice.file_id.clone(),
- file_slice.partition_path.clone(),
- );
+ let mut file_group = FileGroup::new_with_base_file_name(
+ &file_slice.base_file_name,
+ &file_slice.partition_path,
+ )
+ .map_err(PythonError::from)?;
+ let log_file_names = &file_slice.log_file_names;
file_group
- .add_base_file_from_name(&file_slice.base_file_name)
+ .add_log_files_from_names(log_file_names)
.map_err(PythonError::from)?;
- for name in file_slice.log_file_names.iter() {
- file_group
- .add_log_file_from_name(name)
- .map_err(PythonError::from)?;
- }
- let (_, inner_file_slice) = file_group
+ let (_, file_slice) = file_group
.file_slices
.iter()
.next()
@@ -109,7 +105,7 @@ impl HudiFileGroupReader {
))
})
.map_err(PythonError::from)?;
- rt().block_on(self.inner.read_file_slice(inner_file_slice))
+ rt().block_on(self.inner.read_file_slice(file_slice))
.map_err(PythonError::from)?
.to_pyarrow(py)
}
@@ -327,11 +323,12 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
- let filters = filters.unwrap_or_default();
-
py.allow_threads(|| {
let file_slices = rt()
- .block_on(self.inner.get_file_slices_splits(n,
&filters.as_strs()))
+ .block_on(
+ self.inner
+ .get_file_slices_splits(n,
filters.unwrap_or_default()),
+ )
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
@@ -348,14 +345,13 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
- let filters = filters.unwrap_or_default();
-
py.allow_threads(|| {
let file_slices = rt()
- .block_on(
- self.inner
- .get_file_slices_splits_as_of(n, timestamp,
&filters.as_strs()),
- )
+ .block_on(self.inner.get_file_slices_splits_as_of(
+ n,
+ timestamp,
+ filters.unwrap_or_default(),
+ ))
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
@@ -370,11 +366,9 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
- let filters = filters.unwrap_or_default();
-
py.allow_threads(|| {
let file_slices = rt()
- .block_on(self.inner.get_file_slices(&filters.as_strs()))
+
.block_on(self.inner.get_file_slices(filters.unwrap_or_default()))
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
@@ -387,13 +381,11 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
- let filters = filters.unwrap_or_default();
-
py.allow_threads(|| {
let file_slices = rt()
.block_on(
self.inner
- .get_file_slices_as_of(timestamp, &filters.as_strs()),
+ .get_file_slices_as_of(timestamp,
filters.unwrap_or_default()),
)
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(HudiFileSlice::from).collect())
@@ -436,9 +428,7 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
- let filters = filters.unwrap_or_default();
-
- rt().block_on(self.inner.read_snapshot(&filters.as_strs()))
+ rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
.map_err(PythonError::from)?
.to_pyarrow(py)
}
@@ -450,11 +440,9 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
- let filters = filters.unwrap_or_default();
-
rt().block_on(
self.inner
- .read_snapshot_as_of(timestamp, &filters.as_strs()),
+ .read_snapshot_as_of(timestamp, filters.unwrap_or_default()),
)
.map_err(PythonError::from)?
.to_pyarrow(py)