This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 65d8ed4 feat: support partition prune api (#119)
65d8ed4 is described below
commit 65d8ed49a63bd6578a17c88d8f0d24a9dc9981d7
Author: KnightChess <[email protected]>
AuthorDate: Sun Oct 6 07:52:24 2024 +0800
feat: support partition prune api (#119)
Add filtering capabilities to table API, currently only partition fields
are applicable. Multiple predicates are AND together.
hudi_table.read_snapshot(&["foo != a", "bar >= 100"]);
Supported operators are: `>, >=, <, <=, =, !=`
---------
Co-authored-by: Shiyan Xu <[email protected]>
---
Cargo.toml | 2 +
crates/core/Cargo.toml | 2 +
crates/core/src/config/table.rs | 1 +
crates/core/src/file_group/mod.rs | 3 +-
crates/core/src/table/fs_view.rs | 175 +++++++++----
crates/core/src/table/mod.rs | 372 +++++++++++++++++++--------
crates/core/src/table/partition.rs | 508 +++++++++++++++++++++++++++++++++++++
crates/datafusion/src/lib.rs | 3 +-
python/src/internal.rs | 10 +-
9 files changed, 920 insertions(+), 156 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 024ba94..71f3ca0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -56,12 +56,14 @@ datafusion-common = { version = "= 42.0.0" }
datafusion-physical-expr = { version = "= 42.0.0" }
# serde
+percent-encoding = { version = "2.3.1" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = { version = "1" }
# "stdlib"
anyhow = { version = "1.0.86" }
bytes = { version = "1" }
+once_cell = { version = "1.19.0" }
strum = { version = "0.26.3", features = ["derive"] }
strum_macros = "0.26.4"
url = { version = "2" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index dcaf547..1969d9a 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -65,6 +65,8 @@ datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
+percent-encoding = { workspace = true }
+once_cell = { workspace = true }
[dev-dependencies]
hudi-tests = { path = "../tests" }
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index f55df65..8a5d8d6 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -130,6 +130,7 @@ impl ConfigParser for HudiTableConfig {
match self {
Self::DatabaseName =>
Some(HudiConfigValue::String("default".to_string())),
Self::DropsPartitionFields =>
Some(HudiConfigValue::Boolean(false)),
+ Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
_ => None,
}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 7afe537..7cd1f47 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -257,7 +257,8 @@ mod tests {
.base_file
.commit_time,
"20240402123035233"
- )
+ );
+ assert!(fg.get_file_slice_as_of("-1").is_none());
}
#[test]
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 65cc2a9..9d249b8 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -20,15 +20,15 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use anyhow::{anyhow, Result};
-use arrow::record_batch::RecordBatch;
-use dashmap::DashMap;
-use url::Url;
-
use crate::config::HudiConfigs;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
+use crate::table::partition::PartitionPruner;
+use anyhow::{anyhow, Result};
+use arrow::record_batch::RecordBatch;
+use dashmap::DashMap;
+use url::Url;
#[derive(Clone, Debug)]
#[allow(dead_code)]
@@ -45,10 +45,7 @@ impl FileSystemView {
configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, &storage_options)?;
- let partition_paths = Self::load_partition_paths(&storage).await?;
- let partition_to_file_groups =
- Self::load_file_groups_for_partitions(&storage,
partition_paths).await?;
- let partition_to_file_groups =
Arc::new(DashMap::from_iter(partition_to_file_groups));
+ let partition_to_file_groups = Arc::new(DashMap::new());
Ok(FileSystemView {
configs,
storage,
@@ -56,7 +53,10 @@ impl FileSystemView {
})
}
- async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
+ async fn load_partition_paths(
+ storage: &Storage,
+ partition_pruner: &PartitionPruner,
+ ) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await?
@@ -70,7 +70,14 @@ impl FileSystemView {
if partition_paths.is_empty() {
partition_paths.push("".to_string())
}
- Ok(partition_paths)
+ if partition_pruner.is_empty() {
+ return Ok(partition_paths);
+ }
+
+ Ok(partition_paths
+ .into_iter()
+ .filter(|path_str| partition_pruner.should_include(path_str))
+ .collect())
}
async fn load_file_groups_for_partitions(
@@ -121,55 +128,50 @@ impl FileSystemView {
Ok(file_groups)
}
- pub fn get_file_slices_as_of(
+ pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
+ partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
- for fgs in self.partition_to_file_groups.iter() {
- let fgs_ref = fgs.value();
+ if self.partition_to_file_groups.is_empty() {
+ let partition_paths =
+ Self::load_partition_paths(&self.storage,
partition_pruner).await?;
+ let partition_to_file_groups =
+ Self::load_file_groups_for_partitions(&self.storage,
partition_paths).await?;
+ partition_to_file_groups.into_iter().for_each(|pair| {
+ self.partition_to_file_groups.insert(pair.0, pair.1);
+ });
+ }
+ for mut fgs in self
+ .partition_to_file_groups
+ .iter_mut()
+ .filter(|item| partition_pruner.should_include(item.key()))
+ {
+ let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
if excluding_file_groups.contains(fg) {
continue;
}
- if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
+ if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
// TODO: pass ref instead of copying
- file_slices.push(fsl.clone());
+ fsl.load_stats(&self.storage).await?;
+ let immut_fsl: &FileSlice = fsl;
+ file_slices.push(immut_fsl.clone());
}
}
}
Ok(file_slices)
}
- pub async fn load_file_slices_stats_as_of(
- &self,
- timestamp: &str,
- exclude_file_groups: &HashSet<FileGroup>,
- ) -> Result<()> {
- for mut fgs in self.partition_to_file_groups.iter_mut() {
- let fgs_ref = fgs.value_mut();
- for fg in fgs_ref {
- if exclude_file_groups.contains(fg) {
- continue;
- }
- if let Some(file_slice) =
fg.get_file_slice_mut_as_of(timestamp) {
- file_slice
- .load_stats(&self.storage)
- .await
- .expect("Successful loading file stats.");
- }
- }
- }
- Ok(())
- }
-
pub async fn read_file_slice_by_path_unchecked(
&self,
relative_path: &str,
) -> Result<RecordBatch> {
self.storage.get_parquet_file_data(relative_path).await
}
+
pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
.await
@@ -178,20 +180,22 @@ impl FileSystemView {
#[cfg(test)]
mod tests {
+ use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
- use hudi_tests::TestTable;
-
use crate::config::HudiConfigs;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
+ use crate::table::partition::PartitionPruner;
+ use crate::table::Table;
#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
- let partition_paths = FileSystemView::load_partition_paths(&storage)
+ let partition_pruner = PartitionPruner::empty();
+ let partition_paths = FileSystemView::load_partition_paths(&storage,
&partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
@@ -203,7 +207,8 @@ mod tests {
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
- let partition_paths = FileSystemView::load_partition_paths(&storage)
+ let partition_pruner = PartitionPruner::empty();
+ let partition_paths = FileSystemView::load_partition_paths(&storage,
&partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
@@ -229,15 +234,97 @@ mod tests {
.await
.unwrap();
+ assert!(fs_view.partition_to_file_groups.is_empty());
+ let partition_pruner = PartitionPruner::empty();
let excludes = HashSet::new();
let file_slices = fs_view
- .get_file_slices_as_of("20240418173551906", &excludes)
+ .get_file_slices_as_of("20240418173551906", &partition_pruner,
&excludes)
+ .await
+ .unwrap();
+ assert_eq!(fs_view.partition_to_file_groups.len(), 1);
+ assert_eq!(file_slices.len(), 1);
+ let fg_ids = file_slices
+ .iter()
+ .map(|fsl| fsl.file_group_id())
+ .collect::<Vec<_>>();
+ assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
+ for fsl in file_slices.iter() {
+ assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4);
+ }
+ }
+
+ #[tokio::test]
+ async fn fs_view_get_latest_file_slices_with_replace_commit() {
+ let base_url =
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let fs_view = FileSystemView::new(
+ Arc::new(base_url),
+ Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::empty()),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(fs_view.partition_to_file_groups.len(), 0);
+ let partition_pruner = PartitionPruner::empty();
+ let excludes = &hudi_table
+ .timeline
+ .get_replaced_file_groups()
+ .await
+ .unwrap();
+ let file_slices = fs_view
+ .get_file_slices_as_of("20240707001303088", &partition_pruner,
excludes)
+ .await
.unwrap();
+ assert_eq!(fs_view.partition_to_file_groups.len(), 3);
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
- assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
+ assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
+ for fsl in file_slices.iter() {
+ assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1);
+ }
+ }
+
+ #[tokio::test]
+ async fn fs_view_get_latest_file_slices_with_partition_filters() {
+ let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let fs_view = FileSystemView::new(
+ Arc::new(base_url),
+ Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::empty()),
+ )
+ .await
+ .unwrap();
+ assert_eq!(fs_view.partition_to_file_groups.len(), 0);
+ let excludes = &hudi_table
+ .timeline
+ .get_replaced_file_groups()
+ .await
+ .unwrap();
+ let partition_schema =
hudi_table.get_partition_schema().await.unwrap();
+ let partition_pruner = PartitionPruner::new(
+ &["byteField < 20", "shortField = 300"],
+ &partition_schema,
+ hudi_table.configs.as_ref(),
+ )
+ .unwrap();
+ let file_slices = fs_view
+ .get_file_slices_as_of("20240418173235694", &partition_pruner,
excludes)
+ .await
+ .unwrap();
+ assert_eq!(fs_view.partition_to_file_groups.len(), 1);
+ assert_eq!(file_slices.len(), 1);
+ let fg_ids = file_slices
+ .iter()
+ .map(|fsl| fsl.file_group_id())
+ .collect::<Vec<_>>();
+ assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
+ for fsl in file_slices.iter() {
+ assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 2);
+ }
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 57f0222..0d802e4 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -52,7 +52,7 @@
//! pub async fn test() {
//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
-//! let record_read = hudi_table.read_snapshot().await.unwrap();
+//! let record_read = hudi_table.read_snapshot(&[]).await.unwrap();
//! }
//! ```
//! 4. get file slice
@@ -66,7 +66,7 @@
//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
//! let file_slices = hudi_table
-//! .split_file_slices(2)
+//! .split_file_slices(2, &[])
//! .await.unwrap();
//! // define every parquet task reader how many slice
//! let mut parquet_file_groups: Vec<Vec<String>> = Vec::new();
@@ -84,7 +84,7 @@
//! }
//! ```
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::env;
use std::path::PathBuf;
use std::str::FromStr;
@@ -92,7 +92,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::record_batch::RecordBatch;
-use arrow_schema::Schema;
+use arrow_schema::{Field, Schema};
use strum::IntoEnumIterator;
use url::Url;
@@ -103,6 +103,7 @@ use TableTypeValue::CopyOnWrite;
use crate::config::internal::HudiInternalConfig;
use crate::config::read::HudiReadConfig;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
+use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::config::HUDI_CONF_DIR;
@@ -110,9 +111,11 @@ use crate::file_group::FileSlice;
use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
+use crate::table::partition::PartitionPruner;
use crate::table::timeline::Timeline;
mod fs_view;
+mod partition;
mod timeline;
/// Hudi Table in-memory
@@ -307,10 +310,34 @@ impl Table {
self.timeline.get_latest_schema().await
}
+ /// Get the latest partition [Schema] of the table
+ pub async fn get_partition_schema(&self) -> Result<Schema> {
+ let partition_fields: HashSet<String> = self
+ .configs
+ .get_or_default(PartitionFields)
+ .to::<Vec<String>>()
+ .into_iter()
+ .collect();
+
+ let schema = self.get_schema().await?;
+ let partition_fields: Vec<Arc<Field>> = schema
+ .fields()
+ .iter()
+ .filter(|field| partition_fields.contains(field.name()))
+ .cloned()
+ .collect();
+
+ Ok(Schema::new(partition_fields))
+ }
+
/// Split the file into a specified number of parts
- pub async fn split_file_slices(&self, n: usize) ->
Result<Vec<Vec<FileSlice>>> {
+ pub async fn split_file_slices(
+ &self,
+ n: usize,
+ filters: &[&str],
+ ) -> Result<Vec<Vec<FileSlice>>> {
let n = std::cmp::max(1, n);
- let file_slices = self.get_file_slices().await?;
+ let file_slices = self.get_file_slices(filters).await?;
let chunk_size = (file_slices.len() + n - 1) / n;
Ok(file_slices
@@ -322,46 +349,54 @@ impl Table {
/// Get all the [FileSlice]s in the table.
///
/// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
- pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
+ pub async fn get_file_slices(&self, filters: &[&str]) ->
Result<Vec<FileSlice>> {
if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
- self.get_file_slices_as_of(timestamp.to::<String>().as_str())
+ self.get_file_slices_as_of(timestamp.to::<String>().as_str(),
filters)
.await
} else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
- self.get_file_slices_as_of(timestamp).await
+ self.get_file_slices_as_of(timestamp, filters).await
} else {
Ok(Vec::new())
}
}
/// Get all the [FileSlice]s at a given timestamp, as a time travel query.
- async fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
+ async fn get_file_slices_as_of(
+ &self,
+ timestamp: &str,
+ filters: &[&str],
+ ) -> Result<Vec<FileSlice>> {
let excludes = self.timeline.get_replaced_file_groups().await?;
+ let partition_schema = self.get_partition_schema().await?;
+ let partition_pruner =
+ PartitionPruner::new(filters, &partition_schema,
self.configs.as_ref())?;
self.file_system_view
- .load_file_slices_stats_as_of(timestamp, &excludes)
+ .get_file_slices_as_of(timestamp, &partition_pruner, &excludes)
.await
- .context("Fail to load file slice stats.")?;
- self.file_system_view
- .get_file_slices_as_of(timestamp, &excludes)
}
/// Get all the latest records in the table.
///
/// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
- pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
+ pub async fn read_snapshot(&self, filters: &[&str]) ->
Result<Vec<RecordBatch>> {
if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
- self.read_snapshot_as_of(timestamp.to::<String>().as_str())
+ self.read_snapshot_as_of(timestamp.to::<String>().as_str(),
filters)
.await
} else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
- self.read_snapshot_as_of(timestamp).await
+ self.read_snapshot_as_of(timestamp, filters).await
} else {
Ok(Vec::new())
}
}
/// Get all the records in the table at a given timestamp, as a time
travel query.
- async fn read_snapshot_as_of(&self, timestamp: &str) ->
Result<Vec<RecordBatch>> {
+ async fn read_snapshot_as_of(
+ &self,
+ timestamp: &str,
+ filters: &[&str],
+ ) -> Result<Vec<RecordBatch>> {
let file_slices = self
- .get_file_slices_as_of(timestamp)
+ .get_file_slices_as_of(timestamp, filters)
.await
.context(format!("Failed to get file slices as of {}",
timestamp))?;
let mut batches = Vec::new();
@@ -375,9 +410,9 @@ impl Table {
}
#[cfg(test)]
- async fn get_file_paths(&self) -> Result<Vec<String>> {
+ async fn get_file_paths_with_filters(&self, filters: &[&str]) ->
Result<Vec<String>> {
let mut file_paths = Vec::new();
- for f in self.get_file_slices().await? {
+ for f in self.get_file_slices(filters).await? {
file_paths.push(f.base_file_path().to_string());
}
Ok(file_paths)
@@ -411,6 +446,7 @@ impl Table {
#[cfg(test)]
mod tests {
+ use arrow_array::StringArray;
use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::PathBuf;
@@ -462,7 +498,7 @@ mod tests {
.collect();
assert_eq!(
fields,
- Vec::from([
+ vec![
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
@@ -497,96 +533,23 @@ mod tests {
"child_struct",
"child_field1",
"child_field2"
- ])
+ ]
);
}
#[tokio::test]
- async fn hudi_table_read_file_slice() {
- let base_url = TestTable::V6Nonpartitioned.url();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
- let batches = hudi_table
- .read_file_slice_by_path(
-
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
- )
- .await
- .unwrap();
- assert_eq!(batches.num_rows(), 4);
- assert_eq!(batches.num_columns(), 21);
- }
-
- #[tokio::test]
- async fn hudi_table_get_file_paths() {
- let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
- assert_eq!(hudi_table.timeline.instants.len(), 2);
- let actual: HashSet<String> =
- HashSet::from_iter(hudi_table.get_file_paths().await.unwrap());
- let expected: HashSet<String> = HashSet::from_iter(vec![
-
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
-
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
-
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
- ]
- .into_iter().map(|f| { join_url_segments(&base_url,
&[f]).unwrap().to_string() })
- .collect::<Vec<_>>());
- assert_eq!(actual, expected);
- }
-
- #[tokio::test]
- async fn hudi_table_get_file_slices_as_of_timestamps() {
- let base_url = TestTable::V6Nonpartitioned.url();
-
+ async fn hudi_table_get_partition_schema() {
+ let base_url = TestTable::V6TimebasedkeygenNonhivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let file_slices = hudi_table.get_file_slices().await.unwrap();
- assert_eq!(
- file_slices
- .iter()
- .map(|f| f.base_file_relative_path())
- .collect::<Vec<_>>(),
-
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
- );
-
- // as of the latest timestamp
- let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
- let hudi_table = Table::new_with_options(base_url.path(), opts)
- .await
- .unwrap();
- let file_slices = hudi_table.get_file_slices().await.unwrap();
- assert_eq!(
- file_slices
- .iter()
- .map(|f| f.base_file_relative_path())
- .collect::<Vec<_>>(),
-
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
- );
-
- // as of just smaller than the latest timestamp
- let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
- let hudi_table = Table::new_with_options(base_url.path(), opts)
- .await
- .unwrap();
- let file_slices = hudi_table.get_file_slices().await.unwrap();
- assert_eq!(
- file_slices
- .iter()
- .map(|f| f.base_file_relative_path())
- .collect::<Vec<_>>(),
-
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
- );
-
- // as of non-exist old timestamp
- let opts = [(AsOfTimestamp.as_ref(), "0")];
- let hudi_table = Table::new_with_options(base_url.path(), opts)
+ let fields: Vec<String> = hudi_table
+ .get_partition_schema()
.await
- .unwrap();
- let file_slices = hudi_table.get_file_slices().await.unwrap();
- assert_eq!(
- file_slices
- .iter()
- .map(|f| f.base_file_relative_path())
- .collect::<Vec<_>>(),
- Vec::<String>::new()
- );
+ .unwrap()
+ .flattened_fields()
+ .into_iter()
+ .map(|f| f.name().to_string())
+ .collect();
+ assert_eq!(fields, vec!["ts_str"]);
}
#[tokio::test]
@@ -672,7 +635,10 @@ mod tests {
assert!(panic::catch_unwind(||
configs.get_or_default(IsHiveStylePartitioning)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(IsPartitionPathUrlencoded)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(KeyGeneratorClass)).is_err());
- assert!(panic::catch_unwind(||
configs.get_or_default(PartitionFields)).is_err());
+ assert!(configs
+ .get_or_default(PartitionFields)
+ .to::<Vec<String>>()
+ .is_empty());
assert!(panic::catch_unwind(||
configs.get_or_default(PrecombineField)).is_err());
assert!(configs.get_or_default(PopulatesMetaFields).to::<bool>());
assert!(panic::catch_unwind(||
configs.get_or_default(RecordKeyFields)).is_err());
@@ -754,4 +720,196 @@ mod tests {
assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
env::remove_var(HUDI_CONF_DIR)
}
+
+ #[tokio::test]
+ async fn hudi_table_read_file_slice() {
+ let base_url = TestTable::V6Nonpartitioned.url();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let batches = hudi_table
+ .read_file_slice_by_path(
+
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
+ )
+ .await
+ .unwrap();
+ assert_eq!(batches.num_rows(), 4);
+ assert_eq!(batches.num_columns(), 21);
+ }
+
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_as_of_timestamps() {
+ let base_url = TestTable::V6Nonpartitioned.url();
+
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+ );
+
+ // as of the latest timestamp
+ let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
+ let hudi_table = Table::new_with_options(base_url.path(), opts)
+ .await
+ .unwrap();
+ let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+ );
+
+ // as of just smaller than the latest timestamp
+ let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
+ let hudi_table = Table::new_with_options(base_url.path(), opts)
+ .await
+ .unwrap();
+ let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
+ );
+
+ // as of non-exist old timestamp
+ let opts = [(AsOfTimestamp.as_ref(), "0")];
+ let hudi_table = Table::new_with_options(base_url.path(), opts)
+ .await
+ .unwrap();
+ let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+ Vec::<String>::new()
+ );
+ }
+
+ #[tokio::test]
+ async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
+ let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ assert_eq!(hudi_table.timeline.instants.len(), 2);
+
+ let partition_filters = &[];
+ let actual: HashSet<String> = HashSet::from_iter(
+ hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap(),
+ );
+ let expected: HashSet<String> = HashSet::from_iter(
+ vec![
+
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
+ ]
+ .into_iter()
+ .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+ .collect::<Vec<_>>(),
+ );
+ assert_eq!(actual, expected);
+
+ let partition_filters = &["byteField >= 10", "byteField < 30"];
+ let actual: HashSet<String> = HashSet::from_iter(
+ hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap(),
+ );
+ let expected: HashSet<String> = HashSet::from_iter(
+ vec![
+
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+ ]
+ .into_iter()
+ .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+ .collect::<Vec<_>>(),
+ );
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
+ let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ assert_eq!(hudi_table.timeline.instants.len(), 2);
+
+ let partition_filters = &[];
+ let actual: HashSet<String> = HashSet::from_iter(
+ hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap(),
+ );
+ let expected: HashSet<String> = HashSet::from_iter(vec![
+
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
+
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
+
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
+ ]
+ .into_iter().map(|f| { join_url_segments(&base_url,
&[f]).unwrap().to_string() })
+ .collect::<Vec<_>>());
+ assert_eq!(actual, expected);
+
+ let partition_filters = &["byteField >= 10", "byteField < 20",
"shortField != 100"];
+ let actual: HashSet<String> = HashSet::from_iter(
+ hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap(),
+ );
+ let expected: HashSet<String> = HashSet::from_iter(vec![
+
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
+ ]
+ .into_iter().map(|f| { join_url_segments(&base_url,
&[f]).unwrap().to_string() })
+ .collect::<Vec<_>>());
+ assert_eq!(actual, expected);
+ }
+
+ #[tokio::test]
+ async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() {
+ let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let partition_filters = &["byteField >= 10", "byteField < 20",
"shortField != 100"];
+ let records =
hudi_table.read_snapshot(partition_filters).await.unwrap();
+ assert_eq!(records.len(), 1);
+ assert_eq!(records[0].num_rows(), 2);
+ let actual_partition_paths: HashSet<&str> = HashSet::from_iter(
+ records[0]
+ .column_by_name("_hoodie_partition_path")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .iter()
+ .map(|s| s.unwrap())
+ .collect::<Vec<_>>(),
+ );
+ let expected_partition_paths: HashSet<&str> =
+ HashSet::from_iter(vec!["byteField=10/shortField=300"]);
+ assert_eq!(actual_partition_paths, expected_partition_paths);
+
+ let actual_file_names: HashSet<&str> = HashSet::from_iter(
+ records[0]
+ .column_by_name("_hoodie_file_name")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .iter()
+ .map(|s| s.unwrap())
+ .collect::<Vec<_>>(),
+ );
+ let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![
+
"a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
+ ]);
+ assert_eq!(actual_file_names, expected_file_names);
+ }
}
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
new file mode 100644
index 0000000..e34b738
--- /dev/null
+++ b/crates/core/src/table/partition.rs
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use anyhow::Result;
+use anyhow::{anyhow, Context};
+use arrow_array::{ArrayRef, Scalar, StringArray};
+use arrow_cast::{cast_with_options, CastOptions};
+use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
+use arrow_schema::{DataType, Field, Schema};
+use once_cell::sync::Lazy;
+use std::cmp::PartialEq;
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionPruner {
+ schema: Arc<Schema>,
+ is_hive_style: bool,
+ is_url_encoded: bool,
+ and_filters: Vec<PartitionFilter>,
+}
+
+impl PartitionPruner {
+ pub fn new(
+ and_filters: &[&str],
+ partition_schema: &Schema,
+ hudi_configs: &HudiConfigs,
+ ) -> Result<Self> {
+ let and_filters = and_filters
+ .iter()
+ .map(|filter| PartitionFilter::try_from((*filter,
partition_schema)))
+ .collect::<Result<Vec<PartitionFilter>>>()?;
+
+ let schema = Arc::new(partition_schema.clone());
+ let is_hive_style: bool = hudi_configs
+ .get_or_default(HudiTableConfig::IsHiveStylePartitioning)
+ .to();
+ let is_url_encoded: bool = hudi_configs
+ .get_or_default(HudiTableConfig::IsPartitionPathUrlencoded)
+ .to();
+ Ok(PartitionPruner {
+ schema,
+ is_hive_style,
+ is_url_encoded,
+ and_filters,
+ })
+ }
+
+ pub fn empty() -> Self {
+ PartitionPruner {
+ schema: Arc::new(Schema::empty()),
+ is_hive_style: false,
+ is_url_encoded: false,
+ and_filters: Vec::new(),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.and_filters.is_empty()
+ }
+
+ pub fn should_include(&self, partition_path: &str) -> bool {
+ let segments = match self.parse_segments(partition_path) {
+ Ok(s) => s,
+ Err(_) => return true, // Include the partition regardless of
parsing error
+ };
+
+ self.and_filters.iter().all(|filter| {
+ match segments.get(filter.field.name()) {
+ Some(segment_value) => {
+ let comparison_result = match filter.operator {
+ Operator::Eq => eq(segment_value, &filter.value),
+ Operator::Ne => neq(segment_value, &filter.value),
+ Operator::Lt => lt(segment_value, &filter.value),
+ Operator::Lte => lt_eq(segment_value, &filter.value),
+ Operator::Gt => gt(segment_value, &filter.value),
+ Operator::Gte => gt_eq(segment_value, &filter.value),
+ };
+
+ match comparison_result {
+ Ok(scalar) => scalar.value(0),
+ Err(_) => true, // Include the partition when
comparison error occurs
+ }
+ }
+ None => true, // Include the partition when filtering field
does not match any field in the partition
+ }
+ })
+ }
+
+ fn parse_segments(&self, partition_path: &str) -> Result<HashMap<String,
Scalar<ArrayRef>>> {
+ let partition_path = if self.is_url_encoded {
+ percent_encoding::percent_decode(partition_path.as_bytes())
+ .decode_utf8()?
+ .into_owned()
+ } else {
+ partition_path.to_string()
+ };
+
+ let parts: Vec<&str> = partition_path.split('/').collect();
+
+ if parts.len() != self.schema.fields().len() {
+ return Err(anyhow!(
+ "Partition path should have {} part(s) but got {}",
+ self.schema.fields().len(),
+ parts.len()
+ ));
+ }
+
+ self.schema
+ .fields()
+ .iter()
+ .zip(parts)
+ .map(|(field, part)| {
+ let value = if self.is_hive_style {
+ let (name, value) = part.split_once('=').ok_or_else(|| {
+ anyhow!("Partition path should be hive-style but got
{}", part)
+ })?;
+ if name != field.name() {
+ return Err(anyhow!(
+ "Partition path should contain {} but got {}",
+ field.name(),
+ name
+ ));
+ }
+ value
+ } else {
+ part
+ };
+ let scalar = PartitionFilter::cast_value(&[value],
field.data_type())?;
+ Ok((field.name().to_string(), scalar))
+ })
+ .collect()
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+enum Operator {
+ Eq,
+ Ne,
+ Lt,
+ Lte,
+ Gt,
+ Gte,
+}
+
+impl Operator {
+ const TOKEN_OP_PAIRS: [(&'static str, Operator); 6] = [
+ ("=", Operator::Eq),
+ ("!=", Operator::Ne),
+ ("<", Operator::Lt),
+ ("<=", Operator::Lte),
+ (">", Operator::Gt),
+ (">=", Operator::Gte),
+ ];
+
+ fn supported_tokens() -> &'static [&'static str] {
+ static TOKENS: Lazy<Vec<&'static str>> = Lazy::new(|| {
+ let mut tokens: Vec<&'static str> = Operator::TOKEN_OP_PAIRS
+ .iter()
+ .map(|&(token, _)| token)
+ .collect();
+ tokens.sort_by_key(|t| std::cmp::Reverse(t.len()));
+ tokens
+ });
+ &TOKENS
+ }
+}
+
+impl FromStr for Operator {
+ type Err = anyhow::Error;
+
+ fn from_str(s: &str) -> Result<Self> {
+ Operator::TOKEN_OP_PAIRS
+ .iter()
+ .find_map(|&(token, op)| if token == s { Some(op) } else { None })
+ .ok_or_else(|| anyhow!("Unsupported operator: {}", s))
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct PartitionFilter {
+ field: Field,
+ operator: Operator,
+ value: Scalar<ArrayRef>,
+}
+
+impl TryFrom<(&str, &Schema)> for PartitionFilter {
+ type Error = anyhow::Error;
+
+ fn try_from((s, partition_schema): (&str, &Schema)) -> Result<Self> {
+ let (field_name, operator_str, value_str) = Self::parse_to_parts(s)?;
+
+ let field: &Field = partition_schema
+ .field_with_name(field_name)
+ .with_context(|| format!("Field '{}' not found in partition
schema", field_name))?;
+
+ let operator = Operator::from_str(operator_str)
+ .with_context(|| format!("Unsupported operator: {}",
operator_str))?;
+
+ let value = &[value_str];
+ let value = Self::cast_value(value, field.data_type())
+ .with_context(|| format!("Unable to cast {:?} as {:?}", value,
field.data_type()))?;
+
+ let field = field.clone();
+ Ok(PartitionFilter {
+ field,
+ operator,
+ value,
+ })
+ }
+}
+
+impl PartitionFilter {
+ fn parse_to_parts(s: &str) -> Result<(&str, &str, &str)> {
+ let s = s.trim();
+
+ let (index, op) = Operator::supported_tokens()
+ .iter()
+ .filter_map(|&op| s.find(op).map(|index| (index, op)))
+ .min_by_key(|(index, _)| *index)
+ .ok_or_else(|| anyhow!("No valid operator found in the filter
string"))?;
+
+ let (field, rest) = s.split_at(index);
+ let (_, value) = rest.split_at(op.len());
+
+ let field = field.trim();
+ let value = value.trim();
+
+ if field.is_empty() || value.is_empty() {
+ return Err(anyhow!(
+ "Invalid filter format: missing field name or value"
+ ));
+ }
+
+ Ok((field, op, value))
+ }
+
+ fn cast_value(value: &[&str; 1], data_type: &DataType) ->
Result<Scalar<ArrayRef>> {
+ let cast_options = CastOptions {
+ safe: false,
+ format_options: Default::default(),
+ };
+ let value = StringArray::from(Vec::from(value));
+ Ok(Scalar::new(cast_with_options(
+ &value,
+ data_type,
+ &cast_options,
+ )?))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::table::HudiTableConfig::{
+ IsHiveStylePartitioning, IsPartitionPathUrlencoded,
+ };
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_array::Datum;
+ use hudi_tests::assert_not;
+ use std::str::FromStr;
+
+ fn create_test_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("date", DataType::Date32, false),
+ Field::new("category", DataType::Utf8, false),
+ Field::new("count", DataType::Int32, false),
+ ])
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_valid() {
+ let schema = create_test_schema();
+ let filter_str = "date = 2023-01-01";
+ let filter = PartitionFilter::try_from((filter_str, &schema));
+ assert!(filter.is_ok());
+ let filter = filter.unwrap();
+ assert_eq!(filter.field.name(), "date");
+ assert_eq!(filter.operator, Operator::Eq);
+ assert_eq!(filter.value.get().0.len(), 1);
+
+ let filter_str = "category!=foo";
+ let filter = PartitionFilter::try_from((filter_str, &schema));
+ assert!(filter.is_ok());
+ let filter = filter.unwrap();
+ assert_eq!(filter.field.name(), "category");
+ assert_eq!(filter.operator, Operator::Ne);
+ assert_eq!(filter.value.get().0.len(), 1);
+ assert_eq!(
+ StringArray::from(filter.value.into_inner().to_data()).value(0),
+ "foo"
+ )
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_invalid_field() {
+ let schema = create_test_schema();
+ let filter_str = "invalid_field = 2023-01-01";
+ let filter = PartitionFilter::try_from((filter_str, &schema));
+ assert!(filter.is_err());
+ assert!(filter
+ .unwrap_err()
+ .to_string()
+ .contains("not found in partition schema"));
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_invalid_operator() {
+ let schema = create_test_schema();
+ let filter_str = "date ?? 2023-01-01";
+ let filter = PartitionFilter::try_from((filter_str, &schema));
+ assert!(filter.is_err());
+ assert!(filter
+ .unwrap_err()
+ .to_string()
+ .contains("No valid operator found"));
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_invalid_value() {
+ let schema = create_test_schema();
+ let filter_str = "count = not_a_number";
+ let filter = PartitionFilter::try_from((filter_str, &schema));
+ assert!(filter.is_err());
+ assert!(filter.unwrap_err().to_string().contains("Unable to cast"));
+ }
+
+ #[test]
+ fn test_parse_to_parts_valid() {
+ let result = PartitionFilter::parse_to_parts("date = 2023-01-01");
+ assert!(result.is_ok());
+ let (field, operator, value) = result.unwrap();
+ assert_eq!(field, "date");
+ assert_eq!(operator, "=");
+ assert_eq!(value, "2023-01-01");
+ }
+
+ #[test]
+ fn test_parse_to_parts_no_operator() {
+ let result = PartitionFilter::parse_to_parts("date 2023-01-01");
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("No valid operator found"));
+ }
+
+ #[test]
+ fn test_parse_to_parts_multiple_operators() {
+ let result = PartitionFilter::parse_to_parts("count >= 10 <= 20");
+ assert!(result.is_ok());
+ let (field, operator, value) = result.unwrap();
+ assert_eq!(field, "count");
+ assert_eq!(operator, ">=");
+ assert_eq!(value, "10 <= 20");
+ }
+
+ #[test]
+ fn test_partition_filter_try_from_all_operators() {
+ let schema = create_test_schema();
+ for &op in Operator::supported_tokens() {
+ let filter_str = format!("count {} 10", op);
+ let filter = PartitionFilter::try_from((filter_str.as_str(),
&schema));
+ assert!(filter.is_ok(), "Failed for operator: {}", op);
+ let filter = filter.unwrap();
+ assert_eq!(filter.field.name(), "count");
+ assert_eq!(filter.operator, Operator::from_str(op).unwrap());
+ }
+ }
+
+ #[test]
+ fn test_operator_from_str() {
+ assert_eq!(Operator::from_str("=").unwrap(), Operator::Eq);
+ assert_eq!(Operator::from_str("!=").unwrap(), Operator::Ne);
+ assert_eq!(Operator::from_str("<").unwrap(), Operator::Lt);
+ assert_eq!(Operator::from_str("<=").unwrap(), Operator::Lte);
+ assert_eq!(Operator::from_str(">").unwrap(), Operator::Gt);
+ assert_eq!(Operator::from_str(">=").unwrap(), Operator::Gte);
+ assert!(Operator::from_str("??").is_err());
+ }
+
+ #[test]
+ fn test_operator_supported_tokens() {
+ assert_eq!(
+ Operator::supported_tokens(),
+ &["!=", "<=", ">=", "=", "<", ">"]
+ );
+ }
+
+ fn create_hudi_configs(is_hive_style: bool, is_url_encoded: bool) ->
HudiConfigs {
+ HudiConfigs::new(HashMap::from([
+ (
+ IsHiveStylePartitioning.as_ref().to_string(),
+ is_hive_style.to_string(),
+ ),
+ (
+ IsPartitionPathUrlencoded.as_ref().to_string(),
+ is_url_encoded.to_string(),
+ ),
+ ]))
+ }
+ #[test]
+ fn test_partition_pruner_new() {
+ let schema = create_test_schema();
+ let configs = create_hudi_configs(true, false);
+ let filters = vec!["date > 2023-01-01", "category = A"];
+
+ let pruner = PartitionPruner::new(&filters, &schema, &configs);
+ assert!(pruner.is_ok());
+
+ let pruner = pruner.unwrap();
+ assert_eq!(pruner.and_filters.len(), 2);
+ assert!(pruner.is_hive_style);
+ assert_not!(pruner.is_url_encoded);
+ }
+
+ #[test]
+ fn test_partition_pruner_empty() {
+ let pruner = PartitionPruner::empty();
+ assert!(pruner.is_empty());
+ assert_not!(pruner.is_hive_style);
+ assert_not!(pruner.is_url_encoded);
+ }
+
+ #[test]
+ fn test_partition_pruner_is_empty() {
+ let schema = create_test_schema();
+ let configs = create_hudi_configs(false, false);
+
+ let pruner_empty = PartitionPruner::new(&[], &schema,
&configs).unwrap();
+ assert!(pruner_empty.is_empty());
+
+ let pruner_non_empty =
+ PartitionPruner::new(&["date > 2023-01-01"], &schema,
&configs).unwrap();
+ assert_not!(pruner_non_empty.is_empty());
+ }
+
+ #[test]
+ fn test_partition_pruner_should_include() {
+ let schema = create_test_schema();
+ let configs = create_hudi_configs(true, false);
+ let filters = vec!["date > 2023-01-01", "category = A", "count <=
100"];
+
+ let pruner = PartitionPruner::new(&filters, &schema,
&configs).unwrap();
+
+ assert!(pruner.should_include("date=2023-02-01/category=A/count=10"));
+ assert!(pruner.should_include("date=2023-02-01/category=A/count=100"));
+
assert_not!(pruner.should_include("date=2022-12-31/category=A/count=10"));
+
assert_not!(pruner.should_include("date=2023-02-01/category=B/count=10"));
+ }
+
+ #[test]
+ fn test_partition_pruner_parse_segments() {
+ let schema = create_test_schema();
+ let configs = create_hudi_configs(true, false);
+ let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
+
+ let segments = pruner
+ .parse_segments("date=2023-02-01/category=A/count=10")
+ .unwrap();
+ assert_eq!(segments.len(), 3);
+ assert!(segments.contains_key("date"));
+ assert!(segments.contains_key("category"));
+ assert!(segments.contains_key("count"));
+ }
+
+ #[test]
+ fn test_partition_pruner_url_encoded() {
+ let schema = create_test_schema();
+ let configs = create_hudi_configs(true, true);
+ let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
+
+ let segments = pruner
+ .parse_segments("date%3D2023-02-01%2Fcategory%3DA%2Fcount%3D10")
+ .unwrap();
+ assert_eq!(segments.len(), 3);
+ assert!(segments.contains_key("date"));
+ assert!(segments.contains_key("category"));
+ assert!(segments.contains_key("count"));
+ }
+
+ #[test]
+ fn test_partition_pruner_invalid_path() {
+ let schema = create_test_schema();
+ let configs = create_hudi_configs(true, false);
+ let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
+
+ assert!(pruner.parse_segments("invalid/path").is_err());
+ }
+}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 3e2fdaf..6aa2035 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -102,7 +102,8 @@ impl TableProvider for HudiDataSource {
let file_slices = self
.table
- .split_file_slices(self.get_input_partitions())
+ // TODO: implement supports_filters_pushdown() to pass filters to
Hudi table API
+ .split_file_slices(self.get_input_partitions(), &[])
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 7c52930..53ea580 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -109,8 +109,9 @@ impl HudiTable {
}
fn split_file_slices(&self, n: usize, py: Python) ->
PyResult<Vec<Vec<HudiFileSlice>>> {
+ // TODO: support passing filters
py.allow_threads(|| {
- let file_slices = rt().block_on(self._table.split_file_slices(n))?;
+ let file_slices = rt().block_on(self._table.split_file_slices(n,
&[]))?;
Ok(file_slices
.iter()
.map(|inner_vec|
inner_vec.iter().map(convert_file_slice).collect())
@@ -119,8 +120,9 @@ impl HudiTable {
}
fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
+ // TODO: support passing filters
py.allow_threads(|| {
- let file_slices = rt().block_on(self._table.get_file_slices())?;
+ let file_slices = rt().block_on(self._table.get_file_slices(&[]))?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
}
@@ -131,7 +133,9 @@ impl HudiTable {
}
fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
+ // TODO: support passing filters
+ rt().block_on(self._table.read_snapshot(&[]))?
+ .to_pyarrow(py)
}
}