This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 65d8ed4  feat: support partition prune api (#119)
65d8ed4 is described below

commit 65d8ed49a63bd6578a17c88d8f0d24a9dc9981d7
Author: KnightChess <[email protected]>
AuthorDate: Sun Oct 6 07:52:24 2024 +0800

    feat: support partition prune api (#119)
    
    Add filtering capabilities to table API, currently only partition fields 
are applicable. Multiple predicates are AND together.
    
    
    hudi_table.read_snapshot(&["foo != a", "bar >= 100"]);
    
    Supported operators are: `>, >=, <, <=, =, !=`
    
    ---------
    
    Co-authored-by: Shiyan Xu <[email protected]>
---
 Cargo.toml                         |   2 +
 crates/core/Cargo.toml             |   2 +
 crates/core/src/config/table.rs    |   1 +
 crates/core/src/file_group/mod.rs  |   3 +-
 crates/core/src/table/fs_view.rs   | 175 +++++++++----
 crates/core/src/table/mod.rs       | 372 +++++++++++++++++++--------
 crates/core/src/table/partition.rs | 508 +++++++++++++++++++++++++++++++++++++
 crates/datafusion/src/lib.rs       |   3 +-
 python/src/internal.rs             |  10 +-
 9 files changed, 920 insertions(+), 156 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 024ba94..71f3ca0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -56,12 +56,14 @@ datafusion-common = { version = "= 42.0.0" }
 datafusion-physical-expr = { version = "= 42.0.0" }
 
 # serde
+percent-encoding = { version = "2.3.1" }
 serde = { version = "1.0.203", features = ["derive"] }
 serde_json = { version = "1" }
 
 # "stdlib"
 anyhow = { version = "1.0.86" }
 bytes = { version = "1" }
+once_cell = { version = "1.19.0" }
 strum = { version = "0.26.3", features = ["derive"] }
 strum_macros = "0.26.4"
 url = { version = "2" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index dcaf547..1969d9a 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -65,6 +65,8 @@ datafusion = { workspace = true, optional = true }
 datafusion-expr = { workspace = true, optional = true }
 datafusion-common = { workspace = true, optional = true }
 datafusion-physical-expr = { workspace = true, optional = true }
+percent-encoding = { workspace = true }
+once_cell = { workspace = true }
 
 [dev-dependencies]
 hudi-tests = { path = "../tests" }
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index f55df65..8a5d8d6 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -130,6 +130,7 @@ impl ConfigParser for HudiTableConfig {
         match self {
             Self::DatabaseName => 
Some(HudiConfigValue::String("default".to_string())),
             Self::DropsPartitionFields => 
Some(HudiConfigValue::Boolean(false)),
+            Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
             Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
             _ => None,
         }
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 7afe537..7cd1f47 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -257,7 +257,8 @@ mod tests {
                 .base_file
                 .commit_time,
             "20240402123035233"
-        )
+        );
+        assert!(fg.get_file_slice_as_of("-1").is_none());
     }
 
     #[test]
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 65cc2a9..9d249b8 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -20,15 +20,15 @@
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
-use anyhow::{anyhow, Result};
-use arrow::record_batch::RecordBatch;
-use dashmap::DashMap;
-use url::Url;
-
 use crate::config::HudiConfigs;
 use crate::file_group::{BaseFile, FileGroup, FileSlice};
 use crate::storage::file_info::FileInfo;
 use crate::storage::{get_leaf_dirs, Storage};
+use crate::table::partition::PartitionPruner;
+use anyhow::{anyhow, Result};
+use arrow::record_batch::RecordBatch;
+use dashmap::DashMap;
+use url::Url;
 
 #[derive(Clone, Debug)]
 #[allow(dead_code)]
@@ -45,10 +45,7 @@ impl FileSystemView {
         configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
         let storage = Storage::new(base_url, &storage_options)?;
-        let partition_paths = Self::load_partition_paths(&storage).await?;
-        let partition_to_file_groups =
-            Self::load_file_groups_for_partitions(&storage, 
partition_paths).await?;
-        let partition_to_file_groups = 
Arc::new(DashMap::from_iter(partition_to_file_groups));
+        let partition_to_file_groups = Arc::new(DashMap::new());
         Ok(FileSystemView {
             configs,
             storage,
@@ -56,7 +53,10 @@ impl FileSystemView {
         })
     }
 
-    async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
+    async fn load_partition_paths(
+        storage: &Storage,
+        partition_pruner: &PartitionPruner,
+    ) -> Result<Vec<String>> {
         let top_level_dirs: Vec<String> = storage
             .list_dirs(None)
             .await?
@@ -70,7 +70,14 @@ impl FileSystemView {
         if partition_paths.is_empty() {
             partition_paths.push("".to_string())
         }
-        Ok(partition_paths)
+        if partition_pruner.is_empty() {
+            return Ok(partition_paths);
+        }
+
+        Ok(partition_paths
+            .into_iter()
+            .filter(|path_str| partition_pruner.should_include(path_str))
+            .collect())
     }
 
     async fn load_file_groups_for_partitions(
@@ -121,55 +128,50 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub fn get_file_slices_as_of(
+    pub async fn get_file_slices_as_of(
         &self,
         timestamp: &str,
+        partition_pruner: &PartitionPruner,
         excluding_file_groups: &HashSet<FileGroup>,
     ) -> Result<Vec<FileSlice>> {
         let mut file_slices = Vec::new();
-        for fgs in self.partition_to_file_groups.iter() {
-            let fgs_ref = fgs.value();
+        if self.partition_to_file_groups.is_empty() {
+            let partition_paths =
+                Self::load_partition_paths(&self.storage, 
partition_pruner).await?;
+            let partition_to_file_groups =
+                Self::load_file_groups_for_partitions(&self.storage, 
partition_paths).await?;
+            partition_to_file_groups.into_iter().for_each(|pair| {
+                self.partition_to_file_groups.insert(pair.0, pair.1);
+            });
+        }
+        for mut fgs in self
+            .partition_to_file_groups
+            .iter_mut()
+            .filter(|item| partition_pruner.should_include(item.key()))
+        {
+            let fgs_ref = fgs.value_mut();
             for fg in fgs_ref {
                 if excluding_file_groups.contains(fg) {
                     continue;
                 }
-                if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
+                if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
                     // TODO: pass ref instead of copying
-                    file_slices.push(fsl.clone());
+                    fsl.load_stats(&self.storage).await?;
+                    let immut_fsl: &FileSlice = fsl;
+                    file_slices.push(immut_fsl.clone());
                 }
             }
         }
         Ok(file_slices)
     }
 
-    pub async fn load_file_slices_stats_as_of(
-        &self,
-        timestamp: &str,
-        exclude_file_groups: &HashSet<FileGroup>,
-    ) -> Result<()> {
-        for mut fgs in self.partition_to_file_groups.iter_mut() {
-            let fgs_ref = fgs.value_mut();
-            for fg in fgs_ref {
-                if exclude_file_groups.contains(fg) {
-                    continue;
-                }
-                if let Some(file_slice) = 
fg.get_file_slice_mut_as_of(timestamp) {
-                    file_slice
-                        .load_stats(&self.storage)
-                        .await
-                        .expect("Successful loading file stats.");
-                }
-            }
-        }
-        Ok(())
-    }
-
     pub async fn read_file_slice_by_path_unchecked(
         &self,
         relative_path: &str,
     ) -> Result<RecordBatch> {
         self.storage.get_parquet_file_data(relative_path).await
     }
+
     pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
         
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
             .await
@@ -178,20 +180,22 @@ impl FileSystemView {
 
 #[cfg(test)]
 mod tests {
+    use hudi_tests::TestTable;
     use std::collections::{HashMap, HashSet};
     use std::sync::Arc;
 
-    use hudi_tests::TestTable;
-
     use crate::config::HudiConfigs;
     use crate::storage::Storage;
     use crate::table::fs_view::FileSystemView;
+    use crate::table::partition::PartitionPruner;
+    use crate::table::Table;
 
     #[tokio::test]
     async fn get_partition_paths_for_nonpartitioned_table() {
         let base_url = TestTable::V6Nonpartitioned.url();
         let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
-        let partition_paths = FileSystemView::load_partition_paths(&storage)
+        let partition_pruner = PartitionPruner::empty();
+        let partition_paths = FileSystemView::load_partition_paths(&storage, 
&partition_pruner)
             .await
             .unwrap();
         let partition_path_set: HashSet<&str> =
@@ -203,7 +207,8 @@ mod tests {
     async fn get_partition_paths_for_complexkeygen_table() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
         let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
-        let partition_paths = FileSystemView::load_partition_paths(&storage)
+        let partition_pruner = PartitionPruner::empty();
+        let partition_paths = FileSystemView::load_partition_paths(&storage, 
&partition_pruner)
             .await
             .unwrap();
         let partition_path_set: HashSet<&str> =
@@ -229,15 +234,97 @@ mod tests {
         .await
         .unwrap();
 
+        assert!(fs_view.partition_to_file_groups.is_empty());
+        let partition_pruner = PartitionPruner::empty();
         let excludes = HashSet::new();
         let file_slices = fs_view
-            .get_file_slices_as_of("20240418173551906", &excludes)
+            .get_file_slices_as_of("20240418173551906", &partition_pruner, 
&excludes)
+            .await
+            .unwrap();
+        assert_eq!(fs_view.partition_to_file_groups.len(), 1);
+        assert_eq!(file_slices.len(), 1);
+        let fg_ids = file_slices
+            .iter()
+            .map(|fsl| fsl.file_group_id())
+            .collect::<Vec<_>>();
+        assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
+        for fsl in file_slices.iter() {
+            assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4);
+        }
+    }
+
+    #[tokio::test]
+    async fn fs_view_get_latest_file_slices_with_replace_commit() {
+        let base_url = 
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let fs_view = FileSystemView::new(
+            Arc::new(base_url),
+            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::empty()),
+        )
+        .await
+        .unwrap();
+
+        assert_eq!(fs_view.partition_to_file_groups.len(), 0);
+        let partition_pruner = PartitionPruner::empty();
+        let excludes = &hudi_table
+            .timeline
+            .get_replaced_file_groups()
+            .await
+            .unwrap();
+        let file_slices = fs_view
+            .get_file_slices_as_of("20240707001303088", &partition_pruner, 
excludes)
+            .await
             .unwrap();
+        assert_eq!(fs_view.partition_to_file_groups.len(), 3);
         assert_eq!(file_slices.len(), 1);
         let fg_ids = file_slices
             .iter()
             .map(|fsl| fsl.file_group_id())
             .collect::<Vec<_>>();
-        assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
+        assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
+        for fsl in file_slices.iter() {
+            assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1);
+        }
+    }
+
+    #[tokio::test]
+    async fn fs_view_get_latest_file_slices_with_partition_filters() {
+        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let fs_view = FileSystemView::new(
+            Arc::new(base_url),
+            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::empty()),
+        )
+        .await
+        .unwrap();
+        assert_eq!(fs_view.partition_to_file_groups.len(), 0);
+        let excludes = &hudi_table
+            .timeline
+            .get_replaced_file_groups()
+            .await
+            .unwrap();
+        let partition_schema = 
hudi_table.get_partition_schema().await.unwrap();
+        let partition_pruner = PartitionPruner::new(
+            &["byteField < 20", "shortField = 300"],
+            &partition_schema,
+            hudi_table.configs.as_ref(),
+        )
+        .unwrap();
+        let file_slices = fs_view
+            .get_file_slices_as_of("20240418173235694", &partition_pruner, 
excludes)
+            .await
+            .unwrap();
+        assert_eq!(fs_view.partition_to_file_groups.len(), 1);
+        assert_eq!(file_slices.len(), 1);
+        let fg_ids = file_slices
+            .iter()
+            .map(|fsl| fsl.file_group_id())
+            .collect::<Vec<_>>();
+        assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
+        for fsl in file_slices.iter() {
+            assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 2);
+        }
     }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 57f0222..0d802e4 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -52,7 +52,7 @@
 //! pub async fn test() {
 //!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
 //!     let hudi_table = Table::new(base_uri.path()).await.unwrap();
-//!     let record_read = hudi_table.read_snapshot().await.unwrap();
+//!     let record_read = hudi_table.read_snapshot(&[]).await.unwrap();
 //! }
 //! ```
 //! 4. get file slice
@@ -66,7 +66,7 @@
 //!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
 //!     let hudi_table = Table::new(base_uri.path()).await.unwrap();
 //!     let file_slices = hudi_table
-//!             .split_file_slices(2)
+//!             .split_file_slices(2, &[])
 //!             .await.unwrap();
 //!     // define every parquet task reader how many slice
 //!     let mut parquet_file_groups: Vec<Vec<String>> = Vec::new();
@@ -84,7 +84,7 @@
 //! }
 //! ```
 
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::env;
 use std::path::PathBuf;
 use std::str::FromStr;
@@ -92,7 +92,7 @@ use std::sync::Arc;
 
 use anyhow::{anyhow, Context, Result};
 use arrow::record_batch::RecordBatch;
-use arrow_schema::Schema;
+use arrow_schema::{Field, Schema};
 use strum::IntoEnumIterator;
 use url::Url;
 
@@ -103,6 +103,7 @@ use TableTypeValue::CopyOnWrite;
 use crate::config::internal::HudiInternalConfig;
 use crate::config::read::HudiReadConfig;
 use crate::config::read::HudiReadConfig::AsOfTimestamp;
+use crate::config::table::HudiTableConfig::PartitionFields;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
 use crate::config::HUDI_CONF_DIR;
@@ -110,9 +111,11 @@ use crate::file_group::FileSlice;
 use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
 use crate::storage::Storage;
 use crate::table::fs_view::FileSystemView;
+use crate::table::partition::PartitionPruner;
 use crate::table::timeline::Timeline;
 
 mod fs_view;
+mod partition;
 mod timeline;
 
 /// Hudi Table in-memory
@@ -307,10 +310,34 @@ impl Table {
         self.timeline.get_latest_schema().await
     }
 
+    /// Get the latest partition [Schema] of the table
+    pub async fn get_partition_schema(&self) -> Result<Schema> {
+        let partition_fields: HashSet<String> = self
+            .configs
+            .get_or_default(PartitionFields)
+            .to::<Vec<String>>()
+            .into_iter()
+            .collect();
+
+        let schema = self.get_schema().await?;
+        let partition_fields: Vec<Arc<Field>> = schema
+            .fields()
+            .iter()
+            .filter(|field| partition_fields.contains(field.name()))
+            .cloned()
+            .collect();
+
+        Ok(Schema::new(partition_fields))
+    }
+
     /// Split the file into a specified number of parts
-    pub async fn split_file_slices(&self, n: usize) -> 
Result<Vec<Vec<FileSlice>>> {
+    pub async fn split_file_slices(
+        &self,
+        n: usize,
+        filters: &[&str],
+    ) -> Result<Vec<Vec<FileSlice>>> {
         let n = std::cmp::max(1, n);
-        let file_slices = self.get_file_slices().await?;
+        let file_slices = self.get_file_slices(filters).await?;
         let chunk_size = (file_slices.len() + n - 1) / n;
 
         Ok(file_slices
@@ -322,46 +349,54 @@ impl Table {
     /// Get all the [FileSlice]s in the table.
     ///
     /// If the [AsOfTimestamp] configuration is set, the file slices at the 
specified timestamp will be returned.
-    pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
+    pub async fn get_file_slices(&self, filters: &[&str]) -> 
Result<Vec<FileSlice>> {
         if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
-            self.get_file_slices_as_of(timestamp.to::<String>().as_str())
+            self.get_file_slices_as_of(timestamp.to::<String>().as_str(), 
filters)
                 .await
         } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
-            self.get_file_slices_as_of(timestamp).await
+            self.get_file_slices_as_of(timestamp, filters).await
         } else {
             Ok(Vec::new())
         }
     }
 
     /// Get all the [FileSlice]s at a given timestamp, as a time travel query.
-    async fn get_file_slices_as_of(&self, timestamp: &str) -> 
Result<Vec<FileSlice>> {
+    async fn get_file_slices_as_of(
+        &self,
+        timestamp: &str,
+        filters: &[&str],
+    ) -> Result<Vec<FileSlice>> {
         let excludes = self.timeline.get_replaced_file_groups().await?;
+        let partition_schema = self.get_partition_schema().await?;
+        let partition_pruner =
+            PartitionPruner::new(filters, &partition_schema, 
self.configs.as_ref())?;
         self.file_system_view
-            .load_file_slices_stats_as_of(timestamp, &excludes)
+            .get_file_slices_as_of(timestamp, &partition_pruner, &excludes)
             .await
-            .context("Fail to load file slice stats.")?;
-        self.file_system_view
-            .get_file_slices_as_of(timestamp, &excludes)
     }
 
     /// Get all the latest records in the table.
     ///
     /// If the [AsOfTimestamp] configuration is set, the records at the 
specified timestamp will be returned.
-    pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
+    pub async fn read_snapshot(&self, filters: &[&str]) -> 
Result<Vec<RecordBatch>> {
         if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
-            self.read_snapshot_as_of(timestamp.to::<String>().as_str())
+            self.read_snapshot_as_of(timestamp.to::<String>().as_str(), 
filters)
                 .await
         } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
-            self.read_snapshot_as_of(timestamp).await
+            self.read_snapshot_as_of(timestamp, filters).await
         } else {
             Ok(Vec::new())
         }
     }
 
     /// Get all the records in the table at a given timestamp, as a time 
travel query.
-    async fn read_snapshot_as_of(&self, timestamp: &str) -> 
Result<Vec<RecordBatch>> {
+    async fn read_snapshot_as_of(
+        &self,
+        timestamp: &str,
+        filters: &[&str],
+    ) -> Result<Vec<RecordBatch>> {
         let file_slices = self
-            .get_file_slices_as_of(timestamp)
+            .get_file_slices_as_of(timestamp, filters)
             .await
             .context(format!("Failed to get file slices as of {}", 
timestamp))?;
         let mut batches = Vec::new();
@@ -375,9 +410,9 @@ impl Table {
     }
 
     #[cfg(test)]
-    async fn get_file_paths(&self) -> Result<Vec<String>> {
+    async fn get_file_paths_with_filters(&self, filters: &[&str]) -> 
Result<Vec<String>> {
         let mut file_paths = Vec::new();
-        for f in self.get_file_slices().await? {
+        for f in self.get_file_slices(filters).await? {
             file_paths.push(f.base_file_path().to_string());
         }
         Ok(file_paths)
@@ -411,6 +446,7 @@ impl Table {
 
 #[cfg(test)]
 mod tests {
+    use arrow_array::StringArray;
     use std::collections::HashSet;
     use std::fs::canonicalize;
     use std::path::PathBuf;
@@ -462,7 +498,7 @@ mod tests {
             .collect();
         assert_eq!(
             fields,
-            Vec::from([
+            vec![
                 "_hoodie_commit_time",
                 "_hoodie_commit_seqno",
                 "_hoodie_record_key",
@@ -497,96 +533,23 @@ mod tests {
                 "child_struct",
                 "child_field1",
                 "child_field2"
-            ])
+            ]
         );
     }
 
     #[tokio::test]
-    async fn hudi_table_read_file_slice() {
-        let base_url = TestTable::V6Nonpartitioned.url();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let batches = hudi_table
-            .read_file_slice_by_path(
-                
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
-            )
-            .await
-            .unwrap();
-        assert_eq!(batches.num_rows(), 4);
-        assert_eq!(batches.num_columns(), 21);
-    }
-
-    #[tokio::test]
-    async fn hudi_table_get_file_paths() {
-        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
-        assert_eq!(hudi_table.timeline.instants.len(), 2);
-        let actual: HashSet<String> =
-            HashSet::from_iter(hudi_table.get_file_paths().await.unwrap());
-        let expected: HashSet<String> = HashSet::from_iter(vec![
-            
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
-            
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
-            
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
-        ]
-            .into_iter().map(|f| { join_url_segments(&base_url, 
&[f]).unwrap().to_string() })
-            .collect::<Vec<_>>());
-        assert_eq!(actual, expected);
-    }
-
-    #[tokio::test]
-    async fn hudi_table_get_file_slices_as_of_timestamps() {
-        let base_url = TestTable::V6Nonpartitioned.url();
-
+    async fn hudi_table_get_partition_schema() {
+        let base_url = TestTable::V6TimebasedkeygenNonhivestyle.url();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let file_slices = hudi_table.get_file_slices().await.unwrap();
-        assert_eq!(
-            file_slices
-                .iter()
-                .map(|f| f.base_file_relative_path())
-                .collect::<Vec<_>>(),
-            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
-        );
-
-        // as of the latest timestamp
-        let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
-        let hudi_table = Table::new_with_options(base_url.path(), opts)
-            .await
-            .unwrap();
-        let file_slices = hudi_table.get_file_slices().await.unwrap();
-        assert_eq!(
-            file_slices
-                .iter()
-                .map(|f| f.base_file_relative_path())
-                .collect::<Vec<_>>(),
-            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
-        );
-
-        // as of just smaller than the latest timestamp
-        let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
-        let hudi_table = Table::new_with_options(base_url.path(), opts)
-            .await
-            .unwrap();
-        let file_slices = hudi_table.get_file_slices().await.unwrap();
-        assert_eq!(
-            file_slices
-                .iter()
-                .map(|f| f.base_file_relative_path())
-                .collect::<Vec<_>>(),
-            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
-        );
-
-        // as of non-exist old timestamp
-        let opts = [(AsOfTimestamp.as_ref(), "0")];
-        let hudi_table = Table::new_with_options(base_url.path(), opts)
+        let fields: Vec<String> = hudi_table
+            .get_partition_schema()
             .await
-            .unwrap();
-        let file_slices = hudi_table.get_file_slices().await.unwrap();
-        assert_eq!(
-            file_slices
-                .iter()
-                .map(|f| f.base_file_relative_path())
-                .collect::<Vec<_>>(),
-            Vec::<String>::new()
-        );
+            .unwrap()
+            .flattened_fields()
+            .into_iter()
+            .map(|f| f.name().to_string())
+            .collect();
+        assert_eq!(fields, vec!["ts_str"]);
     }
 
     #[tokio::test]
@@ -672,7 +635,10 @@ mod tests {
         assert!(panic::catch_unwind(|| 
configs.get_or_default(IsHiveStylePartitioning)).is_err());
         assert!(panic::catch_unwind(|| 
configs.get_or_default(IsPartitionPathUrlencoded)).is_err());
         assert!(panic::catch_unwind(|| 
configs.get_or_default(KeyGeneratorClass)).is_err());
-        assert!(panic::catch_unwind(|| 
configs.get_or_default(PartitionFields)).is_err());
+        assert!(configs
+            .get_or_default(PartitionFields)
+            .to::<Vec<String>>()
+            .is_empty());
         assert!(panic::catch_unwind(|| 
configs.get_or_default(PrecombineField)).is_err());
         assert!(configs.get_or_default(PopulatesMetaFields).to::<bool>());
         assert!(panic::catch_unwind(|| 
configs.get_or_default(RecordKeyFields)).is_err());
@@ -754,4 +720,196 @@ mod tests {
         assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
         env::remove_var(HUDI_CONF_DIR)
     }
+
+    #[tokio::test]
+    async fn hudi_table_read_file_slice() {
+        let base_url = TestTable::V6Nonpartitioned.url();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let batches = hudi_table
+            .read_file_slice_by_path(
+                
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
+            )
+            .await
+            .unwrap();
+        assert_eq!(batches.num_rows(), 4);
+        assert_eq!(batches.num_columns(), 21);
+    }
+
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_as_of_timestamps() {
+        let base_url = TestTable::V6Nonpartitioned.url();
+
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+        );
+
+        // as of the latest timestamp
+        let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
+        let hudi_table = Table::new_with_options(base_url.path(), opts)
+            .await
+            .unwrap();
+        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+        );
+
+        // as of just smaller than the latest timestamp
+        let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
+        let hudi_table = Table::new_with_options(base_url.path(), opts)
+            .await
+            .unwrap();
+        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
+        );
+
+        // as of non-exist old timestamp
+        let opts = [(AsOfTimestamp.as_ref(), "0")];
+        let hudi_table = Table::new_with_options(base_url.path(), opts)
+            .await
+            .unwrap();
+        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            Vec::<String>::new()
+        );
+    }
+
+    #[tokio::test]
+    async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
+        let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        assert_eq!(hudi_table.timeline.instants.len(), 2);
+
+        let partition_filters = &[];
+        let actual: HashSet<String> = HashSet::from_iter(
+            hudi_table
+                .get_file_paths_with_filters(partition_filters)
+                .await
+                .unwrap(),
+        );
+        let expected: HashSet<String> = HashSet::from_iter(
+            vec![
+                
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+                
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+                
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
+            ]
+            .into_iter()
+            .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+            .collect::<Vec<_>>(),
+        );
+        assert_eq!(actual, expected);
+
+        let partition_filters = &["byteField >= 10", "byteField < 30"];
+        let actual: HashSet<String> = HashSet::from_iter(
+            hudi_table
+                .get_file_paths_with_filters(partition_filters)
+                .await
+                .unwrap(),
+        );
+        let expected: HashSet<String> = HashSet::from_iter(
+            vec![
+                
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+                
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+            ]
+            .into_iter()
+            .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+            .collect::<Vec<_>>(),
+        );
+        assert_eq!(actual, expected);
+    }
+
+    #[tokio::test]
+    async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
+        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        assert_eq!(hudi_table.timeline.instants.len(), 2);
+
+        let partition_filters = &[];
+        let actual: HashSet<String> = HashSet::from_iter(
+            hudi_table
+                .get_file_paths_with_filters(partition_filters)
+                .await
+                .unwrap(),
+        );
+        let expected: HashSet<String> = HashSet::from_iter(vec![
+            
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
+            
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
+            
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
+        ]
+            .into_iter().map(|f| { join_url_segments(&base_url, 
&[f]).unwrap().to_string() })
+            .collect::<Vec<_>>());
+        assert_eq!(actual, expected);
+
+        let partition_filters = &["byteField >= 10", "byteField < 20", 
"shortField != 100"];
+        let actual: HashSet<String> = HashSet::from_iter(
+            hudi_table
+                .get_file_paths_with_filters(partition_filters)
+                .await
+                .unwrap(),
+        );
+        let expected: HashSet<String> = HashSet::from_iter(vec![
+            
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
+        ]
+            .into_iter().map(|f| { join_url_segments(&base_url, 
&[f]).unwrap().to_string() })
+            .collect::<Vec<_>>());
+        assert_eq!(actual, expected);
+    }
+
+    #[tokio::test]
+    async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() {
+        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let partition_filters = &["byteField >= 10", "byteField < 20", 
"shortField != 100"];
+        let records = 
hudi_table.read_snapshot(partition_filters).await.unwrap();
+        assert_eq!(records.len(), 1);
+        assert_eq!(records[0].num_rows(), 2);
+        let actual_partition_paths: HashSet<&str> = HashSet::from_iter(
+            records[0]
+                .column_by_name("_hoodie_partition_path")
+                .unwrap()
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap()
+                .iter()
+                .map(|s| s.unwrap())
+                .collect::<Vec<_>>(),
+        );
+        let expected_partition_paths: HashSet<&str> =
+            HashSet::from_iter(vec!["byteField=10/shortField=300"]);
+        assert_eq!(actual_partition_paths, expected_partition_paths);
+
+        let actual_file_names: HashSet<&str> = HashSet::from_iter(
+            records[0]
+                .column_by_name("_hoodie_file_name")
+                .unwrap()
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap()
+                .iter()
+                .map(|s| s.unwrap())
+                .collect::<Vec<_>>(),
+        );
+        let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![
+            
"a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
+        ]);
+        assert_eq!(actual_file_names, expected_file_names);
+    }
 }
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
new file mode 100644
index 0000000..e34b738
--- /dev/null
+++ b/crates/core/src/table/partition.rs
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use anyhow::Result;
+use anyhow::{anyhow, Context};
+use arrow_array::{ArrayRef, Scalar, StringArray};
+use arrow_cast::{cast_with_options, CastOptions};
+use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
+use arrow_schema::{DataType, Field, Schema};
+use once_cell::sync::Lazy;
+use std::cmp::PartialEq;
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionPruner {
+    schema: Arc<Schema>,
+    is_hive_style: bool,
+    is_url_encoded: bool,
+    and_filters: Vec<PartitionFilter>,
+}
+
+impl PartitionPruner {
+    pub fn new(
+        and_filters: &[&str],
+        partition_schema: &Schema,
+        hudi_configs: &HudiConfigs,
+    ) -> Result<Self> {
+        let and_filters = and_filters
+            .iter()
+            .map(|filter| PartitionFilter::try_from((*filter, 
partition_schema)))
+            .collect::<Result<Vec<PartitionFilter>>>()?;
+
+        let schema = Arc::new(partition_schema.clone());
+        let is_hive_style: bool = hudi_configs
+            .get_or_default(HudiTableConfig::IsHiveStylePartitioning)
+            .to();
+        let is_url_encoded: bool = hudi_configs
+            .get_or_default(HudiTableConfig::IsPartitionPathUrlencoded)
+            .to();
+        Ok(PartitionPruner {
+            schema,
+            is_hive_style,
+            is_url_encoded,
+            and_filters,
+        })
+    }
+
+    pub fn empty() -> Self {
+        PartitionPruner {
+            schema: Arc::new(Schema::empty()),
+            is_hive_style: false,
+            is_url_encoded: false,
+            and_filters: Vec::new(),
+        }
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.and_filters.is_empty()
+    }
+
+    pub fn should_include(&self, partition_path: &str) -> bool {
+        let segments = match self.parse_segments(partition_path) {
+            Ok(s) => s,
+            Err(_) => return true, // Include the partition regardless of 
parsing error
+        };
+
+        self.and_filters.iter().all(|filter| {
+            match segments.get(filter.field.name()) {
+                Some(segment_value) => {
+                    let comparison_result = match filter.operator {
+                        Operator::Eq => eq(segment_value, &filter.value),
+                        Operator::Ne => neq(segment_value, &filter.value),
+                        Operator::Lt => lt(segment_value, &filter.value),
+                        Operator::Lte => lt_eq(segment_value, &filter.value),
+                        Operator::Gt => gt(segment_value, &filter.value),
+                        Operator::Gte => gt_eq(segment_value, &filter.value),
+                    };
+
+                    match comparison_result {
+                        Ok(scalar) => scalar.value(0),
+                        Err(_) => true, // Include the partition when 
comparison error occurs
+                    }
+                }
+                None => true, // Include the partition when filtering field 
does not match any field in the partition
+            }
+        })
+    }
+
+    fn parse_segments(&self, partition_path: &str) -> Result<HashMap<String, 
Scalar<ArrayRef>>> {
+        let partition_path = if self.is_url_encoded {
+            percent_encoding::percent_decode(partition_path.as_bytes())
+                .decode_utf8()?
+                .into_owned()
+        } else {
+            partition_path.to_string()
+        };
+
+        let parts: Vec<&str> = partition_path.split('/').collect();
+
+        if parts.len() != self.schema.fields().len() {
+            return Err(anyhow!(
+                "Partition path should have {} part(s) but got {}",
+                self.schema.fields().len(),
+                parts.len()
+            ));
+        }
+
+        self.schema
+            .fields()
+            .iter()
+            .zip(parts)
+            .map(|(field, part)| {
+                let value = if self.is_hive_style {
+                    let (name, value) = part.split_once('=').ok_or_else(|| {
+                        anyhow!("Partition path should be hive-style but got 
{}", part)
+                    })?;
+                    if name != field.name() {
+                        return Err(anyhow!(
+                            "Partition path should contain {} but got {}",
+                            field.name(),
+                            name
+                        ));
+                    }
+                    value
+                } else {
+                    part
+                };
+                let scalar = PartitionFilter::cast_value(&[value], 
field.data_type())?;
+                Ok((field.name().to_string(), scalar))
+            })
+            .collect()
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+enum Operator {
+    Eq,
+    Ne,
+    Lt,
+    Lte,
+    Gt,
+    Gte,
+}
+
+impl Operator {
+    const TOKEN_OP_PAIRS: [(&'static str, Operator); 6] = [
+        ("=", Operator::Eq),
+        ("!=", Operator::Ne),
+        ("<", Operator::Lt),
+        ("<=", Operator::Lte),
+        (">", Operator::Gt),
+        (">=", Operator::Gte),
+    ];
+
+    fn supported_tokens() -> &'static [&'static str] {
+        static TOKENS: Lazy<Vec<&'static str>> = Lazy::new(|| {
+            let mut tokens: Vec<&'static str> = Operator::TOKEN_OP_PAIRS
+                .iter()
+                .map(|&(token, _)| token)
+                .collect();
+            tokens.sort_by_key(|t| std::cmp::Reverse(t.len()));
+            tokens
+        });
+        &TOKENS
+    }
+}
+
+impl FromStr for Operator {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> Result<Self> {
+        Operator::TOKEN_OP_PAIRS
+            .iter()
+            .find_map(|&(token, op)| if token == s { Some(op) } else { None })
+            .ok_or_else(|| anyhow!("Unsupported operator: {}", s))
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct PartitionFilter {
+    field: Field,
+    operator: Operator,
+    value: Scalar<ArrayRef>,
+}
+
+impl TryFrom<(&str, &Schema)> for PartitionFilter {
+    type Error = anyhow::Error;
+
+    fn try_from((s, partition_schema): (&str, &Schema)) -> Result<Self> {
+        let (field_name, operator_str, value_str) = Self::parse_to_parts(s)?;
+
+        let field: &Field = partition_schema
+            .field_with_name(field_name)
+            .with_context(|| format!("Field '{}' not found in partition 
schema", field_name))?;
+
+        let operator = Operator::from_str(operator_str)
+            .with_context(|| format!("Unsupported operator: {}", 
operator_str))?;
+
+        let value = &[value_str];
+        let value = Self::cast_value(value, field.data_type())
+            .with_context(|| format!("Unable to cast {:?} as {:?}", value, 
field.data_type()))?;
+
+        let field = field.clone();
+        Ok(PartitionFilter {
+            field,
+            operator,
+            value,
+        })
+    }
+}
+
+impl PartitionFilter {
+    fn parse_to_parts(s: &str) -> Result<(&str, &str, &str)> {
+        let s = s.trim();
+
+        let (index, op) = Operator::supported_tokens()
+            .iter()
+            .filter_map(|&op| s.find(op).map(|index| (index, op)))
+            .min_by_key(|(index, _)| *index)
+            .ok_or_else(|| anyhow!("No valid operator found in the filter 
string"))?;
+
+        let (field, rest) = s.split_at(index);
+        let (_, value) = rest.split_at(op.len());
+
+        let field = field.trim();
+        let value = value.trim();
+
+        if field.is_empty() || value.is_empty() {
+            return Err(anyhow!(
+                "Invalid filter format: missing field name or value"
+            ));
+        }
+
+        Ok((field, op, value))
+    }
+
+    fn cast_value(value: &[&str; 1], data_type: &DataType) -> 
Result<Scalar<ArrayRef>> {
+        let cast_options = CastOptions {
+            safe: false,
+            format_options: Default::default(),
+        };
+        let value = StringArray::from(Vec::from(value));
+        Ok(Scalar::new(cast_with_options(
+            &value,
+            data_type,
+            &cast_options,
+        )?))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::config::table::HudiTableConfig::{
+        IsHiveStylePartitioning, IsPartitionPathUrlencoded,
+    };
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_array::Datum;
+    use hudi_tests::assert_not;
+    use std::str::FromStr;
+
+    fn create_test_schema() -> Schema {
+        Schema::new(vec![
+            Field::new("date", DataType::Date32, false),
+            Field::new("category", DataType::Utf8, false),
+            Field::new("count", DataType::Int32, false),
+        ])
+    }
+
+    #[test]
+    fn test_partition_filter_try_from_valid() {
+        let schema = create_test_schema();
+        let filter_str = "date = 2023-01-01";
+        let filter = PartitionFilter::try_from((filter_str, &schema));
+        assert!(filter.is_ok());
+        let filter = filter.unwrap();
+        assert_eq!(filter.field.name(), "date");
+        assert_eq!(filter.operator, Operator::Eq);
+        assert_eq!(filter.value.get().0.len(), 1);
+
+        let filter_str = "category!=foo";
+        let filter = PartitionFilter::try_from((filter_str, &schema));
+        assert!(filter.is_ok());
+        let filter = filter.unwrap();
+        assert_eq!(filter.field.name(), "category");
+        assert_eq!(filter.operator, Operator::Ne);
+        assert_eq!(filter.value.get().0.len(), 1);
+        assert_eq!(
+            StringArray::from(filter.value.into_inner().to_data()).value(0),
+            "foo"
+        )
+    }
+
+    #[test]
+    fn test_partition_filter_try_from_invalid_field() {
+        let schema = create_test_schema();
+        let filter_str = "invalid_field = 2023-01-01";
+        let filter = PartitionFilter::try_from((filter_str, &schema));
+        assert!(filter.is_err());
+        assert!(filter
+            .unwrap_err()
+            .to_string()
+            .contains("not found in partition schema"));
+    }
+
+    #[test]
+    fn test_partition_filter_try_from_invalid_operator() {
+        let schema = create_test_schema();
+        let filter_str = "date ?? 2023-01-01";
+        let filter = PartitionFilter::try_from((filter_str, &schema));
+        assert!(filter.is_err());
+        assert!(filter
+            .unwrap_err()
+            .to_string()
+            .contains("No valid operator found"));
+    }
+
+    #[test]
+    fn test_partition_filter_try_from_invalid_value() {
+        let schema = create_test_schema();
+        let filter_str = "count = not_a_number";
+        let filter = PartitionFilter::try_from((filter_str, &schema));
+        assert!(filter.is_err());
+        assert!(filter.unwrap_err().to_string().contains("Unable to cast"));
+    }
+
+    #[test]
+    fn test_parse_to_parts_valid() {
+        let result = PartitionFilter::parse_to_parts("date = 2023-01-01");
+        assert!(result.is_ok());
+        let (field, operator, value) = result.unwrap();
+        assert_eq!(field, "date");
+        assert_eq!(operator, "=");
+        assert_eq!(value, "2023-01-01");
+    }
+
+    #[test]
+    fn test_parse_to_parts_no_operator() {
+        let result = PartitionFilter::parse_to_parts("date 2023-01-01");
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("No valid operator found"));
+    }
+
+    #[test]
+    fn test_parse_to_parts_multiple_operators() {
+        let result = PartitionFilter::parse_to_parts("count >= 10 <= 20");
+        assert!(result.is_ok());
+        let (field, operator, value) = result.unwrap();
+        assert_eq!(field, "count");
+        assert_eq!(operator, ">=");
+        assert_eq!(value, "10 <= 20");
+    }
+
+    #[test]
+    fn test_partition_filter_try_from_all_operators() {
+        let schema = create_test_schema();
+        for &op in Operator::supported_tokens() {
+            let filter_str = format!("count {} 10", op);
+            let filter = PartitionFilter::try_from((filter_str.as_str(), 
&schema));
+            assert!(filter.is_ok(), "Failed for operator: {}", op);
+            let filter = filter.unwrap();
+            assert_eq!(filter.field.name(), "count");
+            assert_eq!(filter.operator, Operator::from_str(op).unwrap());
+        }
+    }
+
+    #[test]
+    fn test_operator_from_str() {
+        assert_eq!(Operator::from_str("=").unwrap(), Operator::Eq);
+        assert_eq!(Operator::from_str("!=").unwrap(), Operator::Ne);
+        assert_eq!(Operator::from_str("<").unwrap(), Operator::Lt);
+        assert_eq!(Operator::from_str("<=").unwrap(), Operator::Lte);
+        assert_eq!(Operator::from_str(">").unwrap(), Operator::Gt);
+        assert_eq!(Operator::from_str(">=").unwrap(), Operator::Gte);
+        assert!(Operator::from_str("??").is_err());
+    }
+
+    #[test]
+    fn test_operator_supported_tokens() {
+        assert_eq!(
+            Operator::supported_tokens(),
+            &["!=", "<=", ">=", "=", "<", ">"]
+        );
+    }
+
+    fn create_hudi_configs(is_hive_style: bool, is_url_encoded: bool) -> 
HudiConfigs {
+        HudiConfigs::new(HashMap::from([
+            (
+                IsHiveStylePartitioning.as_ref().to_string(),
+                is_hive_style.to_string(),
+            ),
+            (
+                IsPartitionPathUrlencoded.as_ref().to_string(),
+                is_url_encoded.to_string(),
+            ),
+        ]))
+    }
+    #[test]
+    fn test_partition_pruner_new() {
+        let schema = create_test_schema();
+        let configs = create_hudi_configs(true, false);
+        let filters = vec!["date > 2023-01-01", "category = A"];
+
+        let pruner = PartitionPruner::new(&filters, &schema, &configs);
+        assert!(pruner.is_ok());
+
+        let pruner = pruner.unwrap();
+        assert_eq!(pruner.and_filters.len(), 2);
+        assert!(pruner.is_hive_style);
+        assert_not!(pruner.is_url_encoded);
+    }
+
+    #[test]
+    fn test_partition_pruner_empty() {
+        let pruner = PartitionPruner::empty();
+        assert!(pruner.is_empty());
+        assert_not!(pruner.is_hive_style);
+        assert_not!(pruner.is_url_encoded);
+    }
+
+    #[test]
+    fn test_partition_pruner_is_empty() {
+        let schema = create_test_schema();
+        let configs = create_hudi_configs(false, false);
+
+        let pruner_empty = PartitionPruner::new(&[], &schema, 
&configs).unwrap();
+        assert!(pruner_empty.is_empty());
+
+        let pruner_non_empty =
+            PartitionPruner::new(&["date > 2023-01-01"], &schema, 
&configs).unwrap();
+        assert_not!(pruner_non_empty.is_empty());
+    }
+
+    #[test]
+    fn test_partition_pruner_should_include() {
+        let schema = create_test_schema();
+        let configs = create_hudi_configs(true, false);
+        let filters = vec!["date > 2023-01-01", "category = A", "count <= 
100"];
+
+        let pruner = PartitionPruner::new(&filters, &schema, 
&configs).unwrap();
+
+        assert!(pruner.should_include("date=2023-02-01/category=A/count=10"));
+        assert!(pruner.should_include("date=2023-02-01/category=A/count=100"));
+        
assert_not!(pruner.should_include("date=2022-12-31/category=A/count=10"));
+        
assert_not!(pruner.should_include("date=2023-02-01/category=B/count=10"));
+    }
+
+    #[test]
+    fn test_partition_pruner_parse_segments() {
+        let schema = create_test_schema();
+        let configs = create_hudi_configs(true, false);
+        let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
+
+        let segments = pruner
+            .parse_segments("date=2023-02-01/category=A/count=10")
+            .unwrap();
+        assert_eq!(segments.len(), 3);
+        assert!(segments.contains_key("date"));
+        assert!(segments.contains_key("category"));
+        assert!(segments.contains_key("count"));
+    }
+
+    #[test]
+    fn test_partition_pruner_url_encoded() {
+        let schema = create_test_schema();
+        let configs = create_hudi_configs(true, true);
+        let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
+
+        let segments = pruner
+            .parse_segments("date%3D2023-02-01%2Fcategory%3DA%2Fcount%3D10")
+            .unwrap();
+        assert_eq!(segments.len(), 3);
+        assert!(segments.contains_key("date"));
+        assert!(segments.contains_key("category"));
+        assert!(segments.contains_key("count"));
+    }
+
+    #[test]
+    fn test_partition_pruner_invalid_path() {
+        let schema = create_test_schema();
+        let configs = create_hudi_configs(true, false);
+        let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
+
+        assert!(pruner.parse_segments("invalid/path").is_err());
+    }
+}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 3e2fdaf..6aa2035 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -102,7 +102,8 @@ impl TableProvider for HudiDataSource {
 
         let file_slices = self
             .table
-            .split_file_slices(self.get_input_partitions())
+            // TODO: implement supports_filters_pushdown() to pass filters to 
Hudi table API
+            .split_file_slices(self.get_input_partitions(), &[])
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {}", e)))?;
         let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 7c52930..53ea580 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -109,8 +109,9 @@ impl HudiTable {
     }
 
     fn split_file_slices(&self, n: usize, py: Python) -> 
PyResult<Vec<Vec<HudiFileSlice>>> {
+        // TODO: support passing filters
         py.allow_threads(|| {
-            let file_slices = rt().block_on(self._table.split_file_slices(n))?;
+            let file_slices = rt().block_on(self._table.split_file_slices(n, 
&[]))?;
             Ok(file_slices
                 .iter()
                 .map(|inner_vec| 
inner_vec.iter().map(convert_file_slice).collect())
@@ -119,8 +120,9 @@ impl HudiTable {
     }
 
     fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
+        // TODO: support passing filters
         py.allow_threads(|| {
-            let file_slices = rt().block_on(self._table.get_file_slices())?;
+            let file_slices = rt().block_on(self._table.get_file_slices(&[]))?;
             Ok(file_slices.iter().map(convert_file_slice).collect())
         })
     }
@@ -131,7 +133,9 @@ impl HudiTable {
     }
 
     fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
+        // TODO: support passing filters
+        rt().block_on(self._table.read_snapshot(&[]))?
+            .to_pyarrow(py)
     }
 }
 

Reply via email to