This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b2c60b1 refactor: add API stubs for performing incremental queries
(#220)
b2c60b1 is described below
commit b2c60b13c92bde8e433455633f08815cb1af8a0b
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Dec 6 21:51:22 2024 -1000
refactor: add API stubs for performing incremental queries (#220)
---
crates/core/src/table/builder.rs | 11 +++++++-
crates/core/src/table/mod.rs | 58 ++++++++++++++++++++++++----------------
2 files changed, 45 insertions(+), 24 deletions(-)
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index 20df88e..4ca2997 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -28,7 +28,9 @@ use strum::IntoEnumIterator;
use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
use crate::config::read::HudiReadConfig;
-use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType,
TableVersion};
+use crate::config::table::HudiTableConfig::{
+ DropsPartitionFields, TableType, TableVersion, TimelineLayoutVersion,
+};
use crate::config::table::TableTypeValue::CopyOnWrite;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::util::{parse_data_for_options,
split_hudi_options_from_others};
@@ -265,6 +267,13 @@ impl TableBuilder {
));
}
+ let timeline_layout_version =
hudi_configs.get(TimelineLayoutVersion)?.to::<isize>();
+ if timeline_layout_version != 1 {
+ return Err(CoreError::Unsupported(
+ "Only support timeline layout version 1.".to_string(),
+ ));
+ }
+
let drops_partition_cols = hudi_configs
.get_or_default(DropsPartitionFields)
.to::<bool>();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index da32a10..9e7a495 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -243,6 +243,10 @@ impl Table {
.await
}
+ pub fn create_file_group_reader(&self) -> FileGroupReader {
+ FileGroupReader::new(self.file_system_view.storage.clone())
+ }
+
/// Get all the latest records in the table.
///
/// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
@@ -271,20 +275,22 @@ impl Table {
Ok(batches)
}
- #[cfg(test)]
- async fn get_file_paths_with_filters(
+ /// Get records that were inserted or updated between the given
timestamps. Records that were updated multiple times should have their latest
states within the time span being returned.
+ pub async fn read_incremental_records(
&self,
- filters: &[(&str, &str, &str)],
- ) -> Result<Vec<String>> {
- let mut file_paths = Vec::new();
- for f in self.get_file_slices(filters).await? {
- file_paths.push(f.base_file_path().to_string());
- }
- Ok(file_paths)
+ _start_timestamp: &str,
+ _end_timestamp: Option<&str>,
+ ) -> Result<Vec<RecordBatch>> {
+ todo!("read_incremental_states")
}
- pub fn create_file_group_reader(&self) -> FileGroupReader {
- FileGroupReader::new(self.file_system_view.storage.clone())
+ /// Get the change-data-capture (CDC) records between the given
timestamps. The CDC records should reflect the records that were inserted,
updated, and deleted between the timestamps.
+ pub async fn read_incremental_changes(
+ &self,
+ _start_timestamp: &str,
+ _end_timestamp: Option<&str>,
+ ) -> Result<Vec<RecordBatch>> {
+ todo!("read_incremental_changes")
}
}
@@ -326,6 +332,18 @@ mod tests {
.unwrap()
}
+ /// Test helper to get relative file paths from the table with filters.
+ async fn get_file_paths_with_filters(
+ table: &Table,
+ filters: &[(&str, &str, &str)],
+ ) -> Result<Vec<String>> {
+ let mut file_paths = Vec::new();
+ for f in table.get_file_slices(filters).await? {
+ file_paths.push(f.base_file_path().to_string());
+ }
+ Ok(file_paths)
+ }
+
#[tokio::test]
async fn test_hudi_table_get_hudi_options() {
let base_url = TestTable::V6Nonpartitioned.url();
@@ -700,8 +718,7 @@ mod tests {
assert_eq!(hudi_table.timeline.instants.len(), 2);
let partition_filters = &[];
- let actual = hudi_table
- .get_file_paths_with_filters(partition_filters)
+ let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
.await
.unwrap()
.into_iter()
@@ -717,8 +734,7 @@ mod tests {
assert_eq!(actual, expected);
let partition_filters = &[("byteField", ">=", "10"), ("byteField",
"<", "30")];
- let actual = hudi_table
- .get_file_paths_with_filters(partition_filters)
+ let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
.await
.unwrap()
.into_iter()
@@ -733,8 +749,7 @@ mod tests {
assert_eq!(actual, expected);
let partition_filters = &[("byteField", ">", "30")];
- let actual = hudi_table
- .get_file_paths_with_filters(partition_filters)
+ let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
.await
.unwrap()
.into_iter()
@@ -750,8 +765,7 @@ mod tests {
assert_eq!(hudi_table.timeline.instants.len(), 2);
let partition_filters = &[];
- let actual = hudi_table
- .get_file_paths_with_filters(partition_filters)
+ let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
.await
.unwrap()
.into_iter()
@@ -771,8 +785,7 @@ mod tests {
("byteField", "<", "20"),
("shortField", "!=", "100"),
];
- let actual = hudi_table
- .get_file_paths_with_filters(partition_filters)
+ let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
.await
.unwrap()
.into_iter()
@@ -786,8 +799,7 @@ mod tests {
assert_eq!(actual, expected);
let partition_filters = &[("byteField", ">", "20"), ("shortField",
"=", "300")];
- let actual = hudi_table
- .get_file_paths_with_filters(partition_filters)
+ let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
.await
.unwrap()
.into_iter()