This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 9488ab6 feat: add APIs to support incremental query impl (#272)
9488ab6 is described below
commit 9488ab69e4f2d403a6e7bc4a218d0e84f6efc40c
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jan 29 15:55:43 2025 -0600
feat: add APIs to support incremental query impl (#272)
- Add new table API `get_file_slices_between()` to support reading
incremental file slices for engine integration
- Add time range configs for file group reader to support filtering records
and log file scanning
- `hoodie.read.file_group.start_timestamp`
- `hoodie.read.file_group.end_timestamp`
- Remove `hoodie.read.as.of.timestamp` from configs in favor of passing
time travel timestamp via API
- Refactor the table APIs impl to provide clearer flow of reading file
slices
- Push down the logic of checking base file only and composing instant
range to file group reader
- Add the corresponding Python APIs
---
crates/core/src/config/read.rs | 21 +-
crates/core/src/file_group/file_slice.rs | 5 +
crates/core/src/file_group/reader.rs | 330 ++++++++++++++++---------------
crates/core/src/table/mod.rs | 277 +++++++++++++++-----------
crates/core/src/timeline/mod.rs | 14 +-
crates/datafusion/src/lib.rs | 2 +-
python/hudi/_internal.pyi | 44 ++++-
python/src/internal.rs | 135 ++++++++++---
python/tests/test_table_builder.py | 52 ++---
python/tests/test_table_read.py | 6 +-
10 files changed, 528 insertions(+), 358 deletions(-)
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index df0a8ad..f550724 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -33,18 +33,21 @@ use crate::config::{ConfigParser, HudiConfigValue};
/// **Example**
///
/// ```rust
-/// use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp,
InputPartitions};
+/// use hudi_core::config::read::HudiReadConfig::InputPartitions;
/// use hudi_core::table::Table as HudiTable;
///
-/// let options = [(InputPartitions, "2"), (AsOfTimestamp,
"20240101010100000")];
+/// let options = [(InputPartitions, "2")];
/// HudiTable::new_with_options("/tmp/hudi_data", options)
/// ```
///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
- /// The query instant for time travel. Without specified this option, we
query the latest snapshot.
- AsOfTimestamp,
+ /// Start timestamp (exclusive) for [FileGroup] to filter records.
+ FileGroupStartTimestamp,
+
+ /// End timestamp (inclusive) for [FileGroup] to filter records.
+ FileGroupEndTimestamp,
/// Number of input partitions to read the data in parallel.
///
@@ -62,7 +65,8 @@ pub enum HudiReadConfig {
impl AsRef<str> for HudiReadConfig {
fn as_ref(&self) -> &str {
match self {
- Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
+ Self::FileGroupStartTimestamp =>
"hoodie.read.file_group.start_timestamp",
+ Self::FileGroupEndTimestamp =>
"hoodie.read.file_group.end_timestamp",
Self::InputPartitions => "hoodie.read.input.partitions",
Self::ListingParallelism => "hoodie.read.listing.parallelism",
Self::UseReadOptimizedMode =>
"hoodie.read.use.read_optimized.mode",
@@ -95,7 +99,12 @@ impl ConfigParser for HudiReadConfig {
.ok_or(NotFound(self.key()));
match self {
- Self::AsOfTimestamp => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::FileGroupStartTimestamp => {
+ get_result.map(|v| HudiConfigValue::String(v.to_string()))
+ }
+ Self::FileGroupEndTimestamp => {
+ get_result.map(|v| HudiConfigValue::String(v.to_string()))
+ }
Self::InputPartitions => get_result
.and_then(|v| {
usize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
diff --git a/crates/core/src/file_group/file_slice.rs
b/crates/core/src/file_group/file_slice.rs
index b9df522..6ba2d0d 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -42,6 +42,11 @@ impl FileSlice {
}
}
+ #[inline]
+ pub fn has_log_file(&self) -> bool {
+ !self.log_files.is_empty()
+ }
+
fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 113a27a..69f9cf4 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -16,81 +16,122 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::config::read::HudiReadConfig;
use crate::config::table::HudiTableConfig;
use crate::config::util::split_hudi_options_from_others;
use crate::config::HudiConfigs;
use crate::error::CoreError::ReadFileSliceError;
use crate::expr::filter::{Filter, SchemableFilter};
use crate::file_group::file_slice::FileSlice;
+use crate::file_group::log_file::scanner::LogFileScanner;
+use crate::merge::record_merger::RecordMerger;
+use crate::metadata::meta_field::MetaField;
use crate::storage::Storage;
+use crate::timeline::selector::InstantRange;
use crate::Result;
use arrow::compute::and;
+use arrow::compute::filter_record_batch;
use arrow_array::{BooleanArray, RecordBatch};
-use arrow_schema::Schema;
use futures::TryFutureExt;
+use std::convert::TryFrom;
use std::sync::Arc;
-use crate::file_group::log_file::scanner::LogFileScanner;
-use crate::merge::record_merger::RecordMerger;
-use crate::timeline::selector::InstantRange;
-use arrow::compute::filter_record_batch;
-
-/// File group reader handles all read operations against a file group.
+/// The reader that handles all read operations against a file group.
#[derive(Clone, Debug)]
pub struct FileGroupReader {
hudi_configs: Arc<HudiConfigs>,
storage: Arc<Storage>,
- and_filters: Vec<SchemableFilter>,
}
impl FileGroupReader {
- pub(crate) fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>)
-> Self {
- Self {
- storage,
- hudi_configs,
- and_filters: Vec::new(),
- }
- }
-
- pub(crate) fn new_with_filters(
- storage: Arc<Storage>,
+ pub(crate) fn new_with_configs_and_options<I, K, V>(
hudi_configs: Arc<HudiConfigs>,
- and_filters: &[Filter],
- schema: &Schema,
- ) -> Result<Self> {
- let and_filters = and_filters
- .iter()
- .map(|filter| SchemableFilter::try_from((filter.clone(), schema)))
- .collect::<Result<Vec<SchemableFilter>>>()?;
+ options: I,
+ ) -> Result<Self>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ let (hudi_opts, others) = split_hudi_options_from_others(options);
+
+ let mut final_opts = hudi_configs.as_options();
+ final_opts.extend(hudi_opts);
+ let hudi_configs = Arc::new(HudiConfigs::new(final_opts));
+ let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
Ok(Self {
- storage,
hudi_configs,
- and_filters,
+ storage,
})
}
+ /// Creates a new reader with the given base URI and options.
+ ///
+ /// # Arguments
+ /// * `base_uri` - The base URI of the file group's residing table.
+ /// * `options` - Additional options for the reader.
pub fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
- let (mut hudi_opts, others) = split_hudi_options_from_others(options);
- hudi_opts.insert(
+ let hudi_configs = Arc::new(HudiConfigs::new([(
HudiTableConfig::BasePath.as_ref().to_string(),
base_uri.to_string(),
- );
-
- let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
+ )]));
- let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
- Ok(Self::new(hudi_configs, storage))
+ Self::new_with_configs_and_options(hudi_configs, options)
}
- fn create_boolean_array_mask(&self, records: &RecordBatch) ->
Result<BooleanArray> {
+ fn create_filtering_mask_for_base_file_records(
+ &self,
+ records: &RecordBatch,
+ ) -> Result<Option<BooleanArray>> {
+ let populates_meta_fields = self
+ .hudi_configs
+ .get_or_default(HudiTableConfig::PopulatesMetaFields)
+ .to::<bool>();
+ if !populates_meta_fields {
+ // If meta fields are not populated, commit time filtering is not
applicable.
+ return Ok(None);
+ }
+
+ let mut and_filters: Vec<SchemableFilter> = Vec::new();
+ let schema = MetaField::schema();
+ if let Some(start) = self
+ .hudi_configs
+ .try_get(HudiReadConfig::FileGroupStartTimestamp)
+ .map(|v| v.to::<String>())
+ {
+ let filter: Filter =
+ Filter::try_from((MetaField::CommitTime.as_ref(), ">",
start.as_str()))?;
+ let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+ and_filters.push(filter);
+ } else {
+ // If start timestamp is not provided, the query is snapshot or
time-travel, so
+ // commit time filtering is not needed as the base file being read
is already
+ // filtered and selected by the timeline.
+ return Ok(None);
+ }
+
+ if let Some(end) = self
+ .hudi_configs
+ .try_get(HudiReadConfig::FileGroupEndTimestamp)
+ .map(|v| v.to::<String>())
+ {
+ let filter = Filter::try_from((MetaField::CommitTime.as_ref(),
"<=", end.as_str()))?;
+ let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+ and_filters.push(filter);
+ }
+
+ if and_filters.is_empty() {
+ return Ok(None);
+ }
+
let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
- for filter in &self.and_filters {
+ for filter in &and_filters {
let col_name = filter.field.name().as_str();
let col_values = records
.column_by_name(col_name)
@@ -99,9 +140,16 @@ impl FileGroupReader {
let comparison = filter.apply_comparsion(col_values)?;
mask = and(&mask, &comparison)?;
}
- Ok(mask)
+ Ok(Some(mask))
}
+ /// Reads the data from the base file at the given relative path.
+ ///
+ /// # Arguments
+ /// * `relative_path` - The relative path to the base file.
+ ///
+ /// # Returns
+ /// A record batch read from the base file.
pub async fn read_file_slice_by_base_file_path(
&self,
relative_path: &str,
@@ -112,22 +160,44 @@ impl FileGroupReader {
.map_err(|e| ReadFileSliceError(format!("Failed to read path
{relative_path}: {e:?}")))
.await?;
- if self.and_filters.is_empty() {
- return Ok(records);
+ if let Some(mask) =
self.create_filtering_mask_for_base_file_records(&records)? {
+ filter_record_batch(&records, &mask)
+ .map_err(|e| ReadFileSliceError(format!("Failed to filter
records: {e:?}")))
+ } else {
+ Ok(records)
}
+ }
- let mask = self.create_boolean_array_mask(&records)?;
- filter_record_batch(&records, &mask)
- .map_err(|e| ReadFileSliceError(format!("Failed to filter records:
{e:?}")))
+ fn create_instant_range_for_log_file_scan(&self) -> InstantRange {
+ let timezone = self
+ .hudi_configs
+ .get_or_default(HudiTableConfig::TimelineTimezone)
+ .to::<String>();
+ let start_timestamp = self
+ .hudi_configs
+ .try_get(HudiReadConfig::FileGroupStartTimestamp)
+ .map(|v| v.to::<String>());
+ let end_timestamp = self
+ .hudi_configs
+ .try_get(HudiReadConfig::FileGroupEndTimestamp)
+ .map(|v| v.to::<String>());
+ InstantRange::new(timezone, start_timestamp, end_timestamp, false,
true)
}
- pub(crate) async fn read_file_slice(
- &self,
- file_slice: &FileSlice,
- base_file_only: bool,
- instant_range: InstantRange,
- ) -> Result<RecordBatch> {
+ /// Reads the data from the given file slice.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ ///
+ /// # Returns
+ /// A record batch read from the file slice.
+ pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
let relative_path = file_slice.base_file_relative_path()?;
+ let base_file_only = !file_slice.has_log_file()
+ || self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+ .to::<bool>();
if base_file_only {
self.read_file_slice_by_base_file_path(&relative_path).await
} else {
@@ -142,6 +212,7 @@ impl FileGroupReader {
.iter()
.map(|log_file| file_slice.log_file_relative_path(log_file))
.collect::<Result<Vec<String>>>()?;
+ let instant_range = self.create_instant_range_for_log_file_scan();
let log_record_batches =
LogFileScanner::new(self.hudi_configs.clone(),
self.storage.clone())
.scan(log_file_paths, &instant_range)
@@ -159,71 +230,12 @@ impl FileGroupReader {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::config::util::empty_options;
use crate::error::CoreError;
- use crate::expr::filter::FilterField;
use arrow::array::{ArrayRef, Int64Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
- use url::Url;
-
- #[test]
- fn test_new() {
- let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
- let storage = Storage::new_with_base_url(base_url).unwrap();
- let fg_reader = FileGroupReader::new(Arc::from(HudiConfigs::empty()),
storage.clone());
- assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
- }
-
- fn create_test_schema() -> Schema {
- Schema::new(vec![
- Field::new("_hoodie_commit_time", DataType::Utf8, false),
- Field::new("name", DataType::Utf8, false),
- Field::new("age", DataType::Int64, false),
- ])
- }
-
- #[tokio::test]
- async fn test_new_with_filters() -> Result<()> {
- let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
- let storage = Storage::new_with_base_url(base_url)?;
- let schema = create_test_schema();
- let empty_configs = Arc::new(HudiConfigs::empty());
-
- // Test case 1: Empty filters
- let reader = FileGroupReader::new_with_filters(
- storage.clone(),
- empty_configs.clone(),
- &[],
- &schema,
- )?;
- assert!(reader.and_filters.is_empty());
-
- // Test case 2: Multiple filters
- let filters = vec![
- FilterField::new("_hoodie_commit_time").gt("0"),
- FilterField::new("age").gte("18"),
- ];
- let reader = FileGroupReader::new_with_filters(
- storage.clone(),
- empty_configs.clone(),
- &filters,
- &schema,
- )?;
- assert_eq!(reader.and_filters.len(), 2);
-
- // Test case 3: Invalid field name should error
- let invalid_filters =
vec![FilterField::new("non_existent_field").eq("value")];
- assert!(FileGroupReader::new_with_filters(
- storage.clone(),
- empty_configs.clone(),
- &invalid_filters,
- &schema
- )
- .is_err());
-
- Ok(())
- }
#[test]
fn test_new_with_options() -> Result<()> {
@@ -239,11 +251,9 @@ mod tests {
#[tokio::test]
async fn test_read_file_slice_returns_error() {
- let storage =
-
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
+ let reader =
+
FileGroupReader::new_with_options("file:///non-existent-path/table",
empty_options())
.unwrap();
- let empty_configs = Arc::new(HudiConfigs::empty());
- let reader = FileGroupReader::new(empty_configs.clone(), storage);
let result = reader
.read_file_slice_by_base_file_path("non_existent_file")
.await;
@@ -251,7 +261,12 @@ mod tests {
}
fn create_test_record_batch() -> Result<RecordBatch> {
- let schema = Arc::new(create_test_schema());
+ let schema = Schema::new(vec![
+ Field::new("_hoodie_commit_time", DataType::Utf8, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("age", DataType::Int64, false),
+ ]);
+ let schema = Arc::new(schema);
let commit_times: ArrayRef = Arc::new(StringArray::from(vec!["1", "2",
"3", "4", "5"]));
let names: ArrayRef = Arc::new(StringArray::from(vec![
@@ -263,67 +278,60 @@ mod tests {
}
#[test]
- fn test_create_boolean_array_mask() -> Result<()> {
- let storage =
-
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())?;
- let empty_configs = Arc::new(HudiConfigs::empty());
- let schema = create_test_schema();
+ fn test_create_filtering_mask_for_base_file_records() -> Result<()> {
+ let base_uri = "file:///non-existent-path/table";
let records = create_test_record_batch()?;
-
- // Test case 1: No filters
- let reader = FileGroupReader::new_with_filters(
- storage.clone(),
- empty_configs.clone(),
- &[],
- &schema,
+ // Test case 1: No meta fields populated
+ let reader = FileGroupReader::new_with_options(
+ base_uri,
+ [
+ (HudiTableConfig::PopulatesMetaFields.as_ref(), "false"),
+ (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
+ ],
)?;
- let mask = reader.create_boolean_array_mask(&records)?;
- assert_eq!(mask, BooleanArray::from(vec![true; 5]));
-
- // Test case 2: Single filter on commit time
- let filters = vec![FilterField::new("_hoodie_commit_time").gt("2")];
- let reader = FileGroupReader::new_with_filters(
- storage.clone(),
- empty_configs.clone(),
- &filters,
- &schema,
+ let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ assert_eq!(mask, None, "Commit time filtering should not be needed");
+
+ // Test case 2: No commit time filtering options
+ let reader = FileGroupReader::new_with_options(base_uri,
empty_options())?;
+ let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ assert_eq!(mask, None);
+
+ // Test case 3: Filtering commit time > '2'
+ let reader = FileGroupReader::new_with_options(
+ base_uri,
+ [(HudiReadConfig::FileGroupStartTimestamp, "2")],
)?;
- let mask = reader.create_boolean_array_mask(&records)?;
+ let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
assert_eq!(
mask,
- BooleanArray::from(vec![false, false, true, true, true]),
+ Some(BooleanArray::from(vec![false, false, true, true, true])),
"Expected only records with commit_time > '2'"
);
- // Test case 3: Multiple AND filters
- let filters = vec![
- FilterField::new("_hoodie_commit_time").gt("2"),
- FilterField::new("age").lt("40"),
- ];
- let reader = FileGroupReader::new_with_filters(
- storage.clone(),
- empty_configs.clone(),
- &filters,
- &schema,
+ // Test case 4: Filtering commit time <= '4'
+ let reader = FileGroupReader::new_with_options(
+ base_uri,
+ [(HudiReadConfig::FileGroupEndTimestamp, "4")],
+ )?;
+ let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
+ assert_eq!(mask, None, "Commit time filtering should not be needed");
+
+ // Test case 5: Filtering commit time > '2' and <= '4'
+ let reader = FileGroupReader::new_with_options(
+ base_uri,
+ [
+ (HudiReadConfig::FileGroupStartTimestamp, "2"),
+ (HudiReadConfig::FileGroupEndTimestamp, "4"),
+ ],
)?;
- let mask = reader.create_boolean_array_mask(&records)?;
+ let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
assert_eq!(
mask,
- BooleanArray::from(vec![false, false, true, false, false]),
- "Expected only record with commit_time > '2' AND age < 40"
+ Some(BooleanArray::from(vec![false, false, true, true, false])),
+ "Expected only records with commit_time > '2' and <= '4'"
);
- // Test case 4: Filter resulting in all false
- let filters = vec![FilterField::new("age").gt("100")];
- let reader = FileGroupReader::new_with_filters(
- storage.clone(),
- empty_configs.clone(),
- &filters,
- &schema,
- )?;
- let mask = reader.create_boolean_array_mask(&records)?;
- assert_eq!(mask, BooleanArray::from(vec![false; 5]));
-
Ok(())
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7a2cf09..511f261 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -90,7 +90,6 @@ mod fs_view;
mod listing;
pub mod partition;
-use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode};
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
@@ -100,11 +99,10 @@ use crate::file_group::reader::FileGroupReader;
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
-use crate::timeline::Timeline;
+use crate::timeline::{Timeline, EARLIEST_START_TIMESTAMP};
use crate::Result;
-use crate::metadata::meta_field::MetaField;
-use crate::timeline::selector::InstantRange;
+use crate::config::read::HudiReadConfig;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use std::collections::{HashMap, HashSet};
@@ -211,22 +209,18 @@ impl Table {
Ok(Schema::new(partition_fields))
}
- /// Get all the [FileSlice]s in the table.
- ///
- /// The file slices are split into `n` chunks.
- ///
- /// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
+ /// Get all the [FileSlice]s in splits from the table.
///
+ /// # Arguments
+ /// * `n` - The number of chunks to split the file slices into.
+ /// * `filters` - Partition filters to apply.
pub async fn get_file_slices_splits(
&self,
n: usize,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
- let filters = from_str_tuples(filters)?;
- if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
- self.get_file_slices_splits_internal(n,
timestamp.to::<String>().as_str(), &filters)
- .await
- } else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
+ if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ let filters = from_str_tuples(filters)?;
self.get_file_slices_splits_internal(n, timestamp, &filters)
.await
} else {
@@ -234,6 +228,12 @@ impl Table {
}
}
+ /// Get all the [FileSlice]s in splits from the table at a given timestamp.
+ ///
+ /// # Arguments
+ /// * `n` - The number of chunks to split the file slices into.
+ /// * `timestamp` - The timestamp which file slices associated with.
+ /// * `filters` - Partition filters to apply.
pub async fn get_file_slices_splits_as_of(
&self,
n: usize,
@@ -267,20 +267,28 @@ impl Table {
/// Get all the [FileSlice]s in the table.
///
- /// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
+ /// # Arguments
+ /// * `filters` - Partition filters to apply.
+ ///
+ /// # Notes
+ /// * This API is useful for implementing snapshot query.
pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<FileSlice>> {
- let filters = from_str_tuples(filters)?;
- if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
- self.get_file_slices_internal(timestamp.to::<String>().as_str(),
&filters)
- .await
- } else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
+ if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ let filters = from_str_tuples(filters)?;
self.get_file_slices_internal(timestamp, &filters).await
} else {
Ok(Vec::new())
}
}
- /// Get all the [FileSlice]s at a given timestamp, as a time travel query.
+ /// Get all the [FileSlice]s in the table at a given timestamp.
+ ///
+ /// # Arguments
+ /// * `timestamp` - The timestamp which file slices associated with.
+ /// * `filters` - Partition filters to apply.
+ ///
+ /// # Notes
+ /// * This API is useful for implementing time travel query.
pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
@@ -307,91 +315,124 @@ impl Table {
.await
}
- pub fn create_file_group_reader(&self) -> FileGroupReader {
- FileGroupReader::new(
- self.hudi_configs.clone(),
- self.file_system_view.storage.clone(),
- )
+ /// Get all the changed [FileSlice]s in the table between the given
timestamps.
+ ///
+ /// # Arguments
+ /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
+ ///
+ /// # Notes
+ /// * This API is useful for implementing incremental query.
+ pub async fn get_file_slices_between(
+ &self,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<FileSlice>> {
+ // If the end timestamp is not provided, use the latest commit
timestamp.
+ let Some(end) = end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp())
+ else {
+ // No latest commit timestamp means the table is empty.
+ return Ok(Vec::new());
+ };
+
+ let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+ self.get_file_slices_between_internal(start, end).await
}
- fn create_file_group_reader_with_filters(
+ async fn get_file_slices_between_internal(
&self,
- filters: &[(&str, &str, &str)],
- schema: &Schema,
- ) -> Result<FileGroupReader> {
- let filters = from_str_tuples(filters)?;
- FileGroupReader::new_with_filters(
- self.file_system_view.storage.clone(),
+ start_timestamp: &str,
+ end_timestamp: &str,
+ ) -> Result<Vec<FileSlice>> {
+ let mut file_slices: Vec<FileSlice> = Vec::new();
+ let file_groups = self
+ .timeline
+ .get_file_groups_between(Some(start_timestamp),
Some(end_timestamp))
+ .await?;
+ for file_group in file_groups {
+ if let Some(file_slice) =
file_group.get_file_slice_as_of(end_timestamp) {
+ file_slices.push(file_slice.clone());
+ }
+ }
+
+ Ok(file_slices)
+ }
+
+ /// Create a [FileGroupReader] using the [Table]'s Hudi configs, and
overwriting options.
+ pub fn create_file_group_reader_with_options<I, K, V>(
+ &self,
+ options: I,
+ ) -> Result<FileGroupReader>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ let mut overwriting_options =
HashMap::with_capacity(self.storage_options.len());
+ for (k, v) in self.storage_options.iter() {
+ overwriting_options.insert(k.clone(), v.clone());
+ }
+ for (k, v) in options {
+ overwriting_options.insert(k.as_ref().to_string(), v.into());
+ }
+ FileGroupReader::new_with_configs_and_options(
self.hudi_configs.clone(),
- &filters,
- schema,
+ overwriting_options,
)
}
/// Get all the latest records in the table.
///
- /// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
+ /// # Arguments
+ /// * `filters` - Partition filters to apply.
pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<RecordBatch>> {
- let filters = from_str_tuples(filters)?;
- let read_optimized_mode = self
- .hudi_configs
- .get_or_default(UseReadOptimizedMode)
- .to::<bool>();
-
- if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
- self.read_snapshot_internal(
- timestamp.to::<String>().as_str(),
- &filters,
- read_optimized_mode,
- )
- .await
- } else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
- self.read_snapshot_internal(timestamp, &filters,
read_optimized_mode)
- .await
+ if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ let filters = from_str_tuples(filters)?;
+ self.read_snapshot_internal(timestamp, &filters).await
} else {
Ok(Vec::new())
}
}
- /// Get all the records in the table at a given timestamp, as a time
travel query.
+ /// Get all the records in the table at a given timestamp.
+ ///
+ /// # Arguments
+ /// * `timestamp` - The timestamp which records associated with.
+ /// * `filters` - Partition filters to apply.
pub async fn read_snapshot_as_of(
&self,
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<RecordBatch>> {
let filters = from_str_tuples(filters)?;
- let read_optimized_mode = self
- .hudi_configs
- .get_or_default(UseReadOptimizedMode)
- .to::<bool>();
- self.read_snapshot_internal(timestamp, &filters, read_optimized_mode)
- .await
+ self.read_snapshot_internal(timestamp, &filters).await
}
async fn read_snapshot_internal(
&self,
timestamp: &str,
filters: &[Filter],
- read_optimized_mode: bool,
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_internal(timestamp,
filters).await?;
- let fg_reader = self.create_file_group_reader();
- let base_file_only =
- read_optimized_mode || self.table_type() ==
TableTypeValue::CopyOnWrite;
- let timezone = self.timezone();
- let instant_range = InstantRange::up_to(timestamp, &timezone);
- let batches = futures::future::try_join_all(
- file_slices
- .iter()
- .map(|f| fg_reader.read_file_slice(f, base_file_only,
instant_range.clone())),
- )
- .await?;
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+ let batches =
+ futures::future::try_join_all(file_slices.iter().map(|f|
fg_reader.read_file_slice(f)))
+ .await?;
Ok(batches)
}
/// Get records that were inserted or updated between the given timestamps.
+ ///
/// Records that were updated multiple times should have their latest
states within
/// the time span being returned.
+ ///
+ /// # Arguments
+ /// * `start_timestamp` - Only records that were inserted or updated
after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only records that were inserted
or updated before or at this timestamp will be returned.
pub async fn read_incremental_records(
&self,
start_timestamp: &str,
@@ -404,41 +445,23 @@ impl Table {
return Ok(Vec::new());
};
- // Use timestamp range to get the target file slices.
- let mut file_slices: Vec<FileSlice> = Vec::new();
- let file_groups = self
- .timeline
- .get_incremental_file_groups(Some(start_timestamp),
Some(end_timestamp))
+ let file_slices = self
+ .get_file_slices_between_internal(start_timestamp, end_timestamp)
.await?;
- for file_group in file_groups {
- if let Some(file_slice) =
file_group.get_file_slice_as_of(end_timestamp) {
- file_slices.push(file_slice.clone());
- }
- }
- // Read incremental records from the file slices.
- let filters = &[
- (MetaField::CommitTime.as_ref(), ">", start_timestamp),
- (MetaField::CommitTime.as_ref(), "<=", end_timestamp),
- ];
- let schema = MetaField::schema();
- let fg_reader = self.create_file_group_reader_with_filters(filters,
&schema)?;
-
- // Read-optimized mode does not apply to incremental query semantics.
- let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
- let timezone = self.timezone();
- let instant_range =
- InstantRange::within_open_closed(start_timestamp, end_timestamp,
&timezone);
- let batches = futures::future::try_join_all(
- file_slices
- .iter()
- .map(|f| fg_reader.read_file_slice(f, base_file_only,
instant_range.clone())),
- )
- .await?;
+ let fg_reader = self.create_file_group_reader_with_options([
+ (HudiReadConfig::FileGroupStartTimestamp, start_timestamp),
+ (HudiReadConfig::FileGroupEndTimestamp, end_timestamp),
+ ])?;
+
+ let batches =
+ futures::future::try_join_all(file_slices.iter().map(|f|
fg_reader.read_file_slice(f)))
+ .await?;
Ok(batches)
}
/// Get the change-data-capture (CDC) records between the given timestamps.
+ ///
/// The CDC records should reflect the records that were inserted,
updated, and deleted
/// between the timestamps.
#[allow(dead_code)]
@@ -454,13 +477,13 @@ impl Table {
#[cfg(test)]
mod tests {
use super::*;
- use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig::{
BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields,
IsHiveStylePartitioning,
IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields,
PopulatesMetaFields,
PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
TimelineLayoutVersion, TimelineTimezone,
};
+ use crate::config::util::empty_options;
use crate::config::HUDI_CONF_DIR;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
@@ -799,7 +822,8 @@ mod tests {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let batches = hudi_table
- .create_file_group_reader()
+ .create_file_group_reader_with_options(empty_options())
+ .unwrap()
.read_file_slice_by_base_file_path(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
)
@@ -830,7 +854,7 @@ mod tests {
}
#[tokio::test]
- async fn hudi_table_get_file_slices_splits_as_of() {
+ async fn hudi_table_get_file_slices_splits_as_of_timestamps() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
let hudi_table = Table::new(base_url.path()).await.unwrap();
@@ -907,11 +931,13 @@ mod tests {
);
// as of just smaller than the latest timestamp
- let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
- let hudi_table = Table::new_with_options(base_url.path(), opts)
+ let hudi_table = Table::new_with_options(base_url.path(),
empty_options())
+ .await
+ .unwrap();
+ let file_slices = hudi_table
+ .get_file_slices_as_of("20240418173551905", &[])
.await
.unwrap();
- let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
@@ -921,11 +947,13 @@ mod tests {
);
// as of non-exist old timestamp
- let opts = [(AsOfTimestamp.as_ref(), "19700101000000")];
- let hudi_table = Table::new_with_options(base_url.path(), opts)
+ let hudi_table = Table::new_with_options(base_url.path(),
empty_options())
+ .await
+ .unwrap();
+ let file_slices = hudi_table
+ .get_file_slices_as_of("19700101000000", &[])
.await
.unwrap();
- let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
@@ -935,6 +963,29 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn empty_hudi_table_get_file_slices_between_timestamps() {
+ let base_url = SampleTable::V6Empty.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices = hudi_table
+ .get_file_slices_between(Some(EARLIEST_START_TIMESTAMP), None)
+ .await
+ .unwrap();
+ assert!(file_slices.is_empty())
+ }
+
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_between_timestamps() {
+ let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices = hudi_table
+ .get_file_slices_between(None, Some("20250121000656060"))
+ .await
+ .unwrap();
+ assert_eq!(file_slices.len(), 3);
+ // TODO: Add more assertions
+ }
+
#[tokio::test]
async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
@@ -1071,9 +1122,11 @@ mod tests {
#[tokio::test]
async fn test_non_partitioned_read_optimized() -> Result<()> {
let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
- let hudi_table =
- Table::new_with_options(base_url.path(),
[(UseReadOptimizedMode.as_ref(), "true")])
- .await?;
+ let hudi_table = Table::new_with_options(
+ base_url.path(),
+ [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
+ )
+ .await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index dccd08a..9722393 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -45,9 +45,11 @@ pub struct Timeline {
pub completed_commits: Vec<Instant>,
}
+pub const EARLIEST_START_TIMESTAMP: &str = "19700101000000000";
+
impl Timeline {
#[cfg(test)]
- pub async fn new_from_completed_commits(
+ pub(crate) async fn new_from_completed_commits(
hudi_configs: Arc<HudiConfigs>,
storage_options: Arc<HashMap<String, String>>,
completed_commits: Vec<Instant>,
@@ -60,7 +62,7 @@ impl Timeline {
})
}
- pub async fn new_from_storage(
+ pub(crate) async fn new_from_storage(
hudi_configs: Arc<HudiConfigs>,
storage_options: Arc<HashMap<String, String>>,
) -> Result<Self> {
@@ -104,7 +106,7 @@ impl Timeline {
Ok(instants)
}
- pub fn get_latest_commit_timestamp(&self) -> Option<&str> {
+ pub(crate) fn get_latest_commit_timestamp(&self) -> Option<&str> {
self.completed_commits
.iter()
.next_back()
@@ -126,7 +128,7 @@ impl Timeline {
.map_err(|e| CoreError::Timeline(format!("Failed to get commit
metadata: {}", e)))
}
- pub async fn get_latest_schema(&self) -> Result<Schema> {
+ pub(crate) async fn get_latest_schema(&self) -> Result<Schema> {
let commit_metadata = self.get_latest_commit_metadata().await?;
let first_partition = commit_metadata
@@ -181,7 +183,7 @@ impl Timeline {
}
}
- pub async fn get_replaced_file_groups_as_of(
+ pub(crate) async fn get_replaced_file_groups_as_of(
&self,
timestamp: &str,
) -> Result<HashSet<FileGroup>> {
@@ -203,7 +205,7 @@ impl Timeline {
/// Get file groups in the timeline ranging from start (exclusive) to end
(inclusive).
/// File groups are as of the [end] timestamp or the latest if not given.
- pub async fn get_incremental_file_groups(
+ pub(crate) async fn get_file_groups_between(
&self,
start_timestamp: Option<&str>,
end_timestamp: Option<&str>,
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 50a7a05..094c41c 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -67,7 +67,7 @@ use hudi_core::util::StrTupleRef;
/// // Create a new HudiDataSource with specific read options
/// let hudi = HudiDataSource::new_with_options(
/// "/tmp/trips_table",
-/// [("hoodie.read.as.of.timestamp", "20241122010827898")]).await?;
+/// [("hoodie.read.input.partitions", 5)]).await?;
///
/// // Register the Hudi table with the session context
/// ctx.register_table("trips_table", Arc::new(hudi))?;
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index d6f6623..a884265 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -24,11 +24,11 @@ __version__: str
@dataclass(init=False)
class HudiFileGroupReader:
"""
- A reader for a group of Hudi file slices. Allows reading of records from
the base file in a Hudi table.
+ The reader that handles all read operations against a file group.
Attributes:
- base_uri (str): The base URI of the Hudi table.
- options (Optional[Dict[str, str]]): Additional options for reading the
file group.
+ base_uri (str): The base URI of the file group's residing table.
+ options (Optional[Dict[str, str]]): Additional options for the reader.
"""
def __init__(self, base_uri: str, options: Optional[Dict[str, str]] =
None):
"""
@@ -43,13 +43,24 @@ class HudiFileGroupReader:
self, relative_path: str
) -> "pyarrow.RecordBatch":
"""
- Reads the data from the base file given a relative path.
+ Reads the data from the base file at the given relative path.
Parameters:
relative_path (str): The relative path to the base file.
Returns:
- pyarrow.RecordBatch: A batch of records read from the base file.
+ pyarrow.RecordBatch: A record batch read from the base file.
+ """
+ ...
+ def read_file_slice(self, file_slice: HudiFileSlice) ->
"pyarrow.RecordBatch":
+ """
+ Reads the data from the given file slice.
+
+ Parameters:
+ file_slice (HudiFileSlice): The file slice to read from.
+
+ Returns:
+ pyarrow.RecordBatch: A record batch read from the file slice.
"""
...
@@ -66,6 +77,7 @@ class HudiFileSlice:
base_file_name (str): The name of the base file.
base_file_size (int): The on-disk size of the base file in bytes.
base_file_byte_size (int): The in-memory size of the base file in
bytes.
+ log_file_names (List[str]): The names of the ordered log files.
num_records (int): The number of records in the file slice.
"""
@@ -75,6 +87,7 @@ class HudiFileSlice:
base_file_name: str
base_file_size: int
base_file_byte_size: int
+ log_file_names: List[str]
num_records: int
def base_file_relative_path(self) -> str:
@@ -85,6 +98,14 @@ class HudiFileSlice:
str: The relative path of the base file.
"""
...
+ def log_files_relative_paths(self) -> List[str]:
+ """
+ Returns the relative paths of the log files for this file slice.
+
+ Returns:
+ List[str]: A list of relative paths of the log files.
+ """
+ ...
@dataclass(init=False)
class HudiTable:
@@ -182,7 +203,18 @@ class HudiTable:
Retrieves all file slices in the Hudi table as of a timestamp,
optionally filtered by the provided filters.
"""
...
- def create_file_group_reader(self) -> HudiFileGroupReader:
+ def get_file_slices_between(
+ self,
+ start_timestamp: Optional[str],
+ end_timestamp: Optional[str],
+ ) -> List[HudiFileSlice]:
+ """
+ Retrieves all changed file slices in the Hudi table between the given
timestamps.
+ """
+ ...
+ def create_file_group_reader_with_options(
+ self, options: Optional[Dict[str, str]] = None
+ ) -> HudiFileGroupReader:
"""
Creates a HudiFileGroupReader for reading records from file groups in
the Hudi table.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index b09ea12..6ee9a01 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -27,6 +27,7 @@ use tokio::runtime::Runtime;
use hudi::error::CoreError;
use hudi::file_group::file_slice::FileSlice;
use hudi::file_group::reader::FileGroupReader;
+use hudi::file_group::FileGroup;
use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
@@ -82,6 +83,34 @@ impl HudiFileGroupReader {
.map_err(PythonError::from)?
.to_pyarrow(py)
}
+ fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) ->
PyResult<PyObject> {
+ let mut file_group = FileGroup::new(
+ file_slice.file_id.clone(),
+ file_slice.partition_path.clone(),
+ );
+ file_group
+ .add_base_file_from_name(&file_slice.base_file_name)
+ .map_err(PythonError::from)?;
+ for name in file_slice.log_file_names.iter() {
+ file_group
+ .add_log_file_from_name(name)
+ .map_err(PythonError::from)?;
+ }
+ let (_, inner_file_slice) = file_group
+ .file_slices
+ .iter()
+ .next()
+ .ok_or_else(|| {
+ CoreError::FileGroup(format!(
+ "Failed to get file slice from file group: {:?}",
+ file_group
+ ))
+ })
+ .map_err(PythonError::from)?;
+ rt().block_on(self.inner.read_file_slice(inner_file_slice))
+ .map_err(PythonError::from)?
+ .to_pyarrow(py)
+ }
}
#[cfg(not(tarpaulin))]
@@ -101,6 +130,8 @@ pub struct HudiFileSlice {
#[pyo3(get)]
base_file_byte_size: i64,
#[pyo3(get)]
+ log_file_names: Vec<String>,
+ #[pyo3(get)]
num_records: i64,
}
@@ -112,34 +143,59 @@ impl HudiFileSlice {
.join(&self.base_file_name)
.to_str()
.map(String::from)
- .ok_or(StorageError::InvalidPath(format!(
- "Failed to get base file relative path for file slice: {:?}",
- self
- )))
+ .ok_or_else(|| {
+ StorageError::InvalidPath(format!(
+ "Failed to get base file relative path for file slice:
{:?}",
+ self
+ ))
+ })
.map_err(CoreError::from)
.map_err(PythonError::from)?;
Ok(path)
}
+ fn log_files_relative_paths(&self) -> PyResult<Vec<String>> {
+ let mut paths = Vec::<String>::new();
+ for name in self.log_file_names.iter() {
+ let p = PathBuf::from(&self.partition_path)
+ .join(name)
+ .to_str()
+ .map(String::from)
+ .ok_or_else(|| {
+ StorageError::InvalidPath(format!(
+ "Failed to get log file relative path for file slice:
{:?}",
+ self
+ ))
+ })
+ .map_err(CoreError::from)
+ .map_err(PythonError::from)?;
+ paths.push(p)
+ }
+ Ok(paths)
+ }
}
#[cfg(not(tarpaulin))]
-fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
- let file_id = f.file_id().to_string();
- let partition_path = f.partition_path.to_string();
- let creation_instant_time = f.creation_instant_time().to_string();
- let base_file_name = f.base_file.file_name();
- let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();
- let base_file_size = file_metadata.size;
- let base_file_byte_size = file_metadata.byte_size;
- let num_records = file_metadata.num_records;
- HudiFileSlice {
- file_id,
- partition_path,
- creation_instant_time,
- base_file_name,
- base_file_size,
- base_file_byte_size,
- num_records,
+impl From<&FileSlice> for HudiFileSlice {
+ fn from(f: &FileSlice) -> Self {
+ let file_id = f.file_id().to_string();
+ let partition_path = f.partition_path.to_string();
+ let creation_instant_time = f.creation_instant_time().to_string();
+ let base_file_name = f.base_file.file_name();
+ let file_metadata =
f.base_file.file_metadata.clone().unwrap_or_default();
+ let base_file_size = file_metadata.size;
+ let base_file_byte_size = file_metadata.byte_size;
+ let log_file_names = f.log_files.iter().map(|l|
l.file_name()).collect();
+ let num_records = file_metadata.num_records;
+ HudiFileSlice {
+ file_id,
+ partition_path,
+ creation_instant_time,
+ base_file_name,
+ base_file_size,
+ base_file_byte_size,
+ log_file_names,
+ num_records,
+ }
}
}
@@ -202,7 +258,7 @@ impl HudiTable {
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
- .map(|inner_vec|
inner_vec.iter().map(convert_file_slice).collect())
+ .map(|inner_vec|
inner_vec.iter().map(HudiFileSlice::from).collect())
.collect())
})
}
@@ -226,7 +282,7 @@ impl HudiTable {
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
- .map(|inner_vec|
inner_vec.iter().map(convert_file_slice).collect())
+ .map(|inner_vec|
inner_vec.iter().map(HudiFileSlice::from).collect())
.collect())
})
}
@@ -243,7 +299,7 @@ impl HudiTable {
let file_slices = rt()
.block_on(self.inner.get_file_slices(&filters.as_strs()))
.map_err(PythonError::from)?;
- Ok(file_slices.iter().map(convert_file_slice).collect())
+ Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
}
@@ -263,12 +319,37 @@ impl HudiTable {
.get_file_slices_as_of(timestamp, &filters.as_strs()),
)
.map_err(PythonError::from)?;
- Ok(file_slices.iter().map(convert_file_slice).collect())
+ Ok(file_slices.iter().map(HudiFileSlice::from).collect())
+ })
+ }
+
+ #[pyo3(signature = (start_timestamp=None, end_timestamp=None))]
+ fn get_file_slices_between(
+ &self,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ py: Python,
+ ) -> PyResult<Vec<HudiFileSlice>> {
+ py.allow_threads(|| {
+ let file_slices = rt()
+ .block_on(
+ self.inner
+ .get_file_slices_between(start_timestamp,
end_timestamp),
+ )
+ .map_err(PythonError::from)?;
+ Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
}
- fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
- let fg_reader = self.inner.create_file_group_reader();
+ #[pyo3(signature = (options=None))]
+ fn create_file_group_reader_with_options(
+ &self,
+ options: Option<HashMap<String, String>>,
+ ) -> PyResult<HudiFileGroupReader> {
+ let fg_reader = self
+ .inner
+ .create_file_group_reader_with_options(options.unwrap_or_default())
+ .map_err(PythonError::from)?;
Ok(HudiFileGroupReader { inner: fg_reader })
}
diff --git a/python/tests/test_table_builder.py
b/python/tests/test_table_builder.py
index 1ae5099..6a0cb22 100644
--- a/python/tests/test_table_builder.py
+++ b/python/tests/test_table_builder.py
@@ -113,11 +113,19 @@ def
test_read_table_returns_correct_data(get_sample_table):
@pytest.mark.parametrize(
"hudi_options,storage_options,options",
[
- ({"hoodie.read.as.of.timestamp": "20240402123035233"}, {}, {}),
- ({}, {}, {"hoodie.read.as.of.timestamp": "20240402123035233"}),
+ (
+ {"hoodie.read.file_group.start_timestamp": "resolved value"},
+ {"hoodie.read.file_group.start_timestamp": "not taking"},
+ {"hoodie.read.file_group.start_timestamp": "lower precedence"},
+ ),
+ (
+ {},
+ {"hoodie.read.file_group.start_timestamp": "not taking"},
+ {"hoodie.read.file_group.start_timestamp": "resolved value"},
+ ),
],
)
-def test_read_table_as_of_timestamp(
+def test_setting_table_options(
get_sample_table, hudi_options, storage_options, options
):
table_path = get_sample_table
@@ -129,37 +137,7 @@ def test_read_table_as_of_timestamp(
.build()
)
- batches = table.read_snapshot()
- t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
- assert t.to_pylist() == [
- {
- "_hoodie_commit_time": "20240402123035233",
- "ts": 1695046462179,
- "uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00",
- "fare": 33.9,
- },
- {
- "_hoodie_commit_time": "20240402123035233",
- "ts": 1695091554788,
- "uuid": "e96c4396-3fad-413a-a942-4cb36106d721",
- "fare": 27.7,
- },
- {
- "_hoodie_commit_time": "20240402123035233",
- "ts": 1695115999911,
- "uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa",
- "fare": 17.85,
- },
- {
- "_hoodie_commit_time": "20240402123035233",
- "ts": 1695159649087,
- "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330",
- "fare": 19.1,
- },
- {
- "_hoodie_commit_time": "20240402123035233",
- "ts": 1695516137016,
- "uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c",
- "fare": 34.15,
- },
- ]
+ assert (
+ table.hudi_options().get("hoodie.read.file_group.start_timestamp")
+ == "resolved value"
+ )
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index c2931aa..2dca24d 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -75,8 +75,10 @@ def test_read_table_can_read_from_batches(get_sample_table):
file_slices = table.get_file_slices()
file_slice_paths = [f.base_file_relative_path() for f in file_slices]
- batch = table.create_file_group_reader().read_file_slice_by_base_file_path(
- file_slice_paths[0]
+ batch = (
+
table.create_file_group_reader_with_options().read_file_slice_by_base_file_path(
+ file_slice_paths[0]
+ )
)
t = pa.Table.from_batches([batch])
assert t.num_rows == 1