This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 9488ab6  feat: add APIs to support incremental query impl (#272)
9488ab6 is described below

commit 9488ab69e4f2d403a6e7bc4a218d0e84f6efc40c
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jan 29 15:55:43 2025 -0600

    feat: add APIs to support incremental query impl (#272)
    
    - Add new table API `get_file_slices_between()` to support reading 
incremental file slices for engine integration
    - Add time range configs for file group reader to support filtering records 
and log file scanning
      - `hoodie.read.file_group.start_timestamp`
      - `hoodie.read.file_group.end_timestamp`
    - Remove `hoodie.read.as.of.timestamp` from configs in favor of passing 
time travel timestamp via API
    - Refactor the table APIs impl to provide clearer flow of reading file 
slices
      - Push down the logic of checking base file only and composing instant 
range to file group reader
    - Add the corresponding Python APIs
---
 crates/core/src/config/read.rs           |  21 +-
 crates/core/src/file_group/file_slice.rs |   5 +
 crates/core/src/file_group/reader.rs     | 330 ++++++++++++++++---------------
 crates/core/src/table/mod.rs             | 277 +++++++++++++++-----------
 crates/core/src/timeline/mod.rs          |  14 +-
 crates/datafusion/src/lib.rs             |   2 +-
 python/hudi/_internal.pyi                |  44 ++++-
 python/src/internal.rs                   | 135 ++++++++++---
 python/tests/test_table_builder.py       |  52 ++---
 python/tests/test_table_read.py          |   6 +-
 10 files changed, 528 insertions(+), 358 deletions(-)

diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index df0a8ad..f550724 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -33,18 +33,21 @@ use crate::config::{ConfigParser, HudiConfigValue};
 /// **Example**
 ///
 /// ```rust
-/// use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp, 
InputPartitions};
+/// use hudi_core::config::read::HudiReadConfig::InputPartitions;
 /// use hudi_core::table::Table as HudiTable;
 ///
-/// let options = [(InputPartitions, "2"), (AsOfTimestamp, 
"20240101010100000")];
+/// let options = [(InputPartitions, "2")];
 /// HudiTable::new_with_options("/tmp/hudi_data", options)
 /// ```
 ///
 
 #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
 pub enum HudiReadConfig {
-    /// The query instant for time travel. Without specified this option, we 
query the latest snapshot.
-    AsOfTimestamp,
+    /// Start timestamp (exclusive) for [FileGroup] to filter records.
+    FileGroupStartTimestamp,
+
+    /// End timestamp (inclusive) for [FileGroup] to filter records.
+    FileGroupEndTimestamp,
 
     /// Number of input partitions to read the data in parallel.
     ///
@@ -62,7 +65,8 @@ pub enum HudiReadConfig {
 impl AsRef<str> for HudiReadConfig {
     fn as_ref(&self) -> &str {
         match self {
-            Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
+            Self::FileGroupStartTimestamp => 
"hoodie.read.file_group.start_timestamp",
+            Self::FileGroupEndTimestamp => 
"hoodie.read.file_group.end_timestamp",
             Self::InputPartitions => "hoodie.read.input.partitions",
             Self::ListingParallelism => "hoodie.read.listing.parallelism",
             Self::UseReadOptimizedMode => 
"hoodie.read.use.read_optimized.mode",
@@ -95,7 +99,12 @@ impl ConfigParser for HudiReadConfig {
             .ok_or(NotFound(self.key()));
 
         match self {
-            Self::AsOfTimestamp => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::FileGroupStartTimestamp => {
+                get_result.map(|v| HudiConfigValue::String(v.to_string()))
+            }
+            Self::FileGroupEndTimestamp => {
+                get_result.map(|v| HudiConfigValue::String(v.to_string()))
+            }
             Self::InputPartitions => get_result
                 .and_then(|v| {
                     usize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
diff --git a/crates/core/src/file_group/file_slice.rs 
b/crates/core/src/file_group/file_slice.rs
index b9df522..6ba2d0d 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -42,6 +42,11 @@ impl FileSlice {
         }
     }
 
+    #[inline]
+    pub fn has_log_file(&self) -> bool {
+        !self.log_files.is_empty()
+    }
+
     fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
         let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
         path.to_str().map(|s| s.to_string()).ok_or_else(|| {
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 113a27a..69f9cf4 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -16,81 +16,122 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::config::read::HudiReadConfig;
 use crate::config::table::HudiTableConfig;
 use crate::config::util::split_hudi_options_from_others;
 use crate::config::HudiConfigs;
 use crate::error::CoreError::ReadFileSliceError;
 use crate::expr::filter::{Filter, SchemableFilter};
 use crate::file_group::file_slice::FileSlice;
+use crate::file_group::log_file::scanner::LogFileScanner;
+use crate::merge::record_merger::RecordMerger;
+use crate::metadata::meta_field::MetaField;
 use crate::storage::Storage;
+use crate::timeline::selector::InstantRange;
 use crate::Result;
 use arrow::compute::and;
+use arrow::compute::filter_record_batch;
 use arrow_array::{BooleanArray, RecordBatch};
-use arrow_schema::Schema;
 use futures::TryFutureExt;
+use std::convert::TryFrom;
 use std::sync::Arc;
 
-use crate::file_group::log_file::scanner::LogFileScanner;
-use crate::merge::record_merger::RecordMerger;
-use crate::timeline::selector::InstantRange;
-use arrow::compute::filter_record_batch;
-
-/// File group reader handles all read operations against a file group.
+/// The reader that handles all read operations against a file group.
 #[derive(Clone, Debug)]
 pub struct FileGroupReader {
     hudi_configs: Arc<HudiConfigs>,
     storage: Arc<Storage>,
-    and_filters: Vec<SchemableFilter>,
 }
 
 impl FileGroupReader {
-    pub(crate) fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) 
-> Self {
-        Self {
-            storage,
-            hudi_configs,
-            and_filters: Vec::new(),
-        }
-    }
-
-    pub(crate) fn new_with_filters(
-        storage: Arc<Storage>,
+    pub(crate) fn new_with_configs_and_options<I, K, V>(
         hudi_configs: Arc<HudiConfigs>,
-        and_filters: &[Filter],
-        schema: &Schema,
-    ) -> Result<Self> {
-        let and_filters = and_filters
-            .iter()
-            .map(|filter| SchemableFilter::try_from((filter.clone(), schema)))
-            .collect::<Result<Vec<SchemableFilter>>>()?;
+        options: I,
+    ) -> Result<Self>
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
+        let (hudi_opts, others) = split_hudi_options_from_others(options);
+
+        let mut final_opts = hudi_configs.as_options();
+        final_opts.extend(hudi_opts);
+        let hudi_configs = Arc::new(HudiConfigs::new(final_opts));
+        let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
 
         Ok(Self {
-            storage,
             hudi_configs,
-            and_filters,
+            storage,
         })
     }
 
+    /// Creates a new reader with the given base URI and options.
+    ///
+    /// # Arguments
+    ///     * `base_uri` - The base URI of the file group's residing table.
+    ///     * `options` - Additional options for the reader.
     pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
     where
         I: IntoIterator<Item = (K, V)>,
         K: AsRef<str>,
         V: Into<String>,
     {
-        let (mut hudi_opts, others) = split_hudi_options_from_others(options);
-        hudi_opts.insert(
+        let hudi_configs = Arc::new(HudiConfigs::new([(
             HudiTableConfig::BasePath.as_ref().to_string(),
             base_uri.to_string(),
-        );
-
-        let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
+        )]));
 
-        let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
-        Ok(Self::new(hudi_configs, storage))
+        Self::new_with_configs_and_options(hudi_configs, options)
     }
 
-    fn create_boolean_array_mask(&self, records: &RecordBatch) -> 
Result<BooleanArray> {
+    fn create_filtering_mask_for_base_file_records(
+        &self,
+        records: &RecordBatch,
+    ) -> Result<Option<BooleanArray>> {
+        let populates_meta_fields = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::PopulatesMetaFields)
+            .to::<bool>();
+        if !populates_meta_fields {
+            // If meta fields are not populated, commit time filtering is not 
applicable.
+            return Ok(None);
+        }
+
+        let mut and_filters: Vec<SchemableFilter> = Vec::new();
+        let schema = MetaField::schema();
+        if let Some(start) = self
+            .hudi_configs
+            .try_get(HudiReadConfig::FileGroupStartTimestamp)
+            .map(|v| v.to::<String>())
+        {
+            let filter: Filter =
+                Filter::try_from((MetaField::CommitTime.as_ref(), ">", 
start.as_str()))?;
+            let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+            and_filters.push(filter);
+        } else {
+            // If start timestamp is not provided, the query is snapshot or 
time-travel, so
+            // commit time filtering is not needed as the base file being read 
is already
+            // filtered and selected by the timeline.
+            return Ok(None);
+        }
+
+        if let Some(end) = self
+            .hudi_configs
+            .try_get(HudiReadConfig::FileGroupEndTimestamp)
+            .map(|v| v.to::<String>())
+        {
+            let filter = Filter::try_from((MetaField::CommitTime.as_ref(), 
"<=", end.as_str()))?;
+            let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
+            and_filters.push(filter);
+        }
+
+        if and_filters.is_empty() {
+            return Ok(None);
+        }
+
         let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
-        for filter in &self.and_filters {
+        for filter in &and_filters {
             let col_name = filter.field.name().as_str();
             let col_values = records
                 .column_by_name(col_name)
@@ -99,9 +140,16 @@ impl FileGroupReader {
             let comparison = filter.apply_comparsion(col_values)?;
             mask = and(&mask, &comparison)?;
         }
-        Ok(mask)
+        Ok(Some(mask))
     }
 
+    /// Reads the data from the base file at the given relative path.
+    ///
+    /// # Arguments
+    ///     * `relative_path` - The relative path to the base file.
+    ///
+    /// # Returns
+    /// A record batch read from the base file.
     pub async fn read_file_slice_by_base_file_path(
         &self,
         relative_path: &str,
@@ -112,22 +160,44 @@ impl FileGroupReader {
             .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{relative_path}: {e:?}")))
             .await?;
 
-        if self.and_filters.is_empty() {
-            return Ok(records);
+        if let Some(mask) = 
self.create_filtering_mask_for_base_file_records(&records)? {
+            filter_record_batch(&records, &mask)
+                .map_err(|e| ReadFileSliceError(format!("Failed to filter 
records: {e:?}")))
+        } else {
+            Ok(records)
         }
+    }
 
-        let mask = self.create_boolean_array_mask(&records)?;
-        filter_record_batch(&records, &mask)
-            .map_err(|e| ReadFileSliceError(format!("Failed to filter records: 
{e:?}")))
+    fn create_instant_range_for_log_file_scan(&self) -> InstantRange {
+        let timezone = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::TimelineTimezone)
+            .to::<String>();
+        let start_timestamp = self
+            .hudi_configs
+            .try_get(HudiReadConfig::FileGroupStartTimestamp)
+            .map(|v| v.to::<String>());
+        let end_timestamp = self
+            .hudi_configs
+            .try_get(HudiReadConfig::FileGroupEndTimestamp)
+            .map(|v| v.to::<String>());
+        InstantRange::new(timezone, start_timestamp, end_timestamp, false, 
true)
     }
 
-    pub(crate) async fn read_file_slice(
-        &self,
-        file_slice: &FileSlice,
-        base_file_only: bool,
-        instant_range: InstantRange,
-    ) -> Result<RecordBatch> {
+    /// Reads the data from the given file slice.
+    ///
+    /// # Arguments
+    ///     * `file_slice` - The file slice to read.
+    ///
+    /// # Returns
+    /// A record batch read from the file slice.
+    pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
         let relative_path = file_slice.base_file_relative_path()?;
+        let base_file_only = !file_slice.has_log_file()
+            || self
+                .hudi_configs
+                .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+                .to::<bool>();
         if base_file_only {
             self.read_file_slice_by_base_file_path(&relative_path).await
         } else {
@@ -142,6 +212,7 @@ impl FileGroupReader {
                 .iter()
                 .map(|log_file| file_slice.log_file_relative_path(log_file))
                 .collect::<Result<Vec<String>>>()?;
+            let instant_range = self.create_instant_range_for_log_file_scan();
             let log_record_batches =
                 LogFileScanner::new(self.hudi_configs.clone(), 
self.storage.clone())
                     .scan(log_file_paths, &instant_range)
@@ -159,71 +230,12 @@ impl FileGroupReader {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::config::util::empty_options;
     use crate::error::CoreError;
-    use crate::expr::filter::FilterField;
     use arrow::array::{ArrayRef, Int64Array, StringArray};
     use arrow::record_batch::RecordBatch;
     use arrow_schema::{DataType, Field, Schema};
     use std::sync::Arc;
-    use url::Url;
-
-    #[test]
-    fn test_new() {
-        let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
-        let storage = Storage::new_with_base_url(base_url).unwrap();
-        let fg_reader = FileGroupReader::new(Arc::from(HudiConfigs::empty()), 
storage.clone());
-        assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
-    }
-
-    fn create_test_schema() -> Schema {
-        Schema::new(vec![
-            Field::new("_hoodie_commit_time", DataType::Utf8, false),
-            Field::new("name", DataType::Utf8, false),
-            Field::new("age", DataType::Int64, false),
-        ])
-    }
-
-    #[tokio::test]
-    async fn test_new_with_filters() -> Result<()> {
-        let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
-        let storage = Storage::new_with_base_url(base_url)?;
-        let schema = create_test_schema();
-        let empty_configs = Arc::new(HudiConfigs::empty());
-
-        // Test case 1: Empty filters
-        let reader = FileGroupReader::new_with_filters(
-            storage.clone(),
-            empty_configs.clone(),
-            &[],
-            &schema,
-        )?;
-        assert!(reader.and_filters.is_empty());
-
-        // Test case 2: Multiple filters
-        let filters = vec![
-            FilterField::new("_hoodie_commit_time").gt("0"),
-            FilterField::new("age").gte("18"),
-        ];
-        let reader = FileGroupReader::new_with_filters(
-            storage.clone(),
-            empty_configs.clone(),
-            &filters,
-            &schema,
-        )?;
-        assert_eq!(reader.and_filters.len(), 2);
-
-        // Test case 3: Invalid field name should error
-        let invalid_filters = 
vec![FilterField::new("non_existent_field").eq("value")];
-        assert!(FileGroupReader::new_with_filters(
-            storage.clone(),
-            empty_configs.clone(),
-            &invalid_filters,
-            &schema
-        )
-        .is_err());
-
-        Ok(())
-    }
 
     #[test]
     fn test_new_with_options() -> Result<()> {
@@ -239,11 +251,9 @@ mod tests {
 
     #[tokio::test]
     async fn test_read_file_slice_returns_error() {
-        let storage =
-            
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
+        let reader =
+            
FileGroupReader::new_with_options("file:///non-existent-path/table", 
empty_options())
                 .unwrap();
-        let empty_configs = Arc::new(HudiConfigs::empty());
-        let reader = FileGroupReader::new(empty_configs.clone(), storage);
         let result = reader
             .read_file_slice_by_base_file_path("non_existent_file")
             .await;
@@ -251,7 +261,12 @@ mod tests {
     }
 
     fn create_test_record_batch() -> Result<RecordBatch> {
-        let schema = Arc::new(create_test_schema());
+        let schema = Schema::new(vec![
+            Field::new("_hoodie_commit_time", DataType::Utf8, false),
+            Field::new("name", DataType::Utf8, false),
+            Field::new("age", DataType::Int64, false),
+        ]);
+        let schema = Arc::new(schema);
 
         let commit_times: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", 
"3", "4", "5"]));
         let names: ArrayRef = Arc::new(StringArray::from(vec![
@@ -263,67 +278,60 @@ mod tests {
     }
 
     #[test]
-    fn test_create_boolean_array_mask() -> Result<()> {
-        let storage =
-            
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())?;
-        let empty_configs = Arc::new(HudiConfigs::empty());
-        let schema = create_test_schema();
+    fn test_create_filtering_mask_for_base_file_records() -> Result<()> {
+        let base_uri = "file:///non-existent-path/table";
         let records = create_test_record_batch()?;
-
-        // Test case 1: No filters
-        let reader = FileGroupReader::new_with_filters(
-            storage.clone(),
-            empty_configs.clone(),
-            &[],
-            &schema,
+        // Test case 1: No meta fields populated
+        let reader = FileGroupReader::new_with_options(
+            base_uri,
+            [
+                (HudiTableConfig::PopulatesMetaFields.as_ref(), "false"),
+                (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
+            ],
         )?;
-        let mask = reader.create_boolean_array_mask(&records)?;
-        assert_eq!(mask, BooleanArray::from(vec![true; 5]));
-
-        // Test case 2: Single filter on commit time
-        let filters = vec![FilterField::new("_hoodie_commit_time").gt("2")];
-        let reader = FileGroupReader::new_with_filters(
-            storage.clone(),
-            empty_configs.clone(),
-            &filters,
-            &schema,
+        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        assert_eq!(mask, None, "Commit time filtering should not be needed");
+
+        // Test case 2: No commit time filtering options
+        let reader = FileGroupReader::new_with_options(base_uri, 
empty_options())?;
+        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        assert_eq!(mask, None);
+
+        // Test case 3: Filtering commit time > '2'
+        let reader = FileGroupReader::new_with_options(
+            base_uri,
+            [(HudiReadConfig::FileGroupStartTimestamp, "2")],
         )?;
-        let mask = reader.create_boolean_array_mask(&records)?;
+        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
         assert_eq!(
             mask,
-            BooleanArray::from(vec![false, false, true, true, true]),
+            Some(BooleanArray::from(vec![false, false, true, true, true])),
             "Expected only records with commit_time > '2'"
         );
 
-        // Test case 3: Multiple AND filters
-        let filters = vec![
-            FilterField::new("_hoodie_commit_time").gt("2"),
-            FilterField::new("age").lt("40"),
-        ];
-        let reader = FileGroupReader::new_with_filters(
-            storage.clone(),
-            empty_configs.clone(),
-            &filters,
-            &schema,
+        // Test case 4: Filtering commit time <= '4'
+        let reader = FileGroupReader::new_with_options(
+            base_uri,
+            [(HudiReadConfig::FileGroupEndTimestamp, "4")],
+        )?;
+        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
+        assert_eq!(mask, None, "Commit time filtering should not be needed");
+
+        // Test case 5: Filtering commit time > '2' and <= '4'
+        let reader = FileGroupReader::new_with_options(
+            base_uri,
+            [
+                (HudiReadConfig::FileGroupStartTimestamp, "2"),
+                (HudiReadConfig::FileGroupEndTimestamp, "4"),
+            ],
         )?;
-        let mask = reader.create_boolean_array_mask(&records)?;
+        let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
         assert_eq!(
             mask,
-            BooleanArray::from(vec![false, false, true, false, false]),
-            "Expected only record with commit_time > '2' AND age < 40"
+            Some(BooleanArray::from(vec![false, false, true, true, false])),
+            "Expected only records with commit_time > '2' and <= '4'"
         );
 
-        // Test case 4: Filter resulting in all false
-        let filters = vec![FilterField::new("age").gt("100")];
-        let reader = FileGroupReader::new_with_filters(
-            storage.clone(),
-            empty_configs.clone(),
-            &filters,
-            &schema,
-        )?;
-        let mask = reader.create_boolean_array_mask(&records)?;
-        assert_eq!(mask, BooleanArray::from(vec![false; 5]));
-
         Ok(())
     }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7a2cf09..511f261 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -90,7 +90,6 @@ mod fs_view;
 mod listing;
 pub mod partition;
 
-use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode};
 use crate::config::table::HudiTableConfig::PartitionFields;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
@@ -100,11 +99,10 @@ use crate::file_group::reader::FileGroupReader;
 use crate::table::builder::TableBuilder;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
-use crate::timeline::Timeline;
+use crate::timeline::{Timeline, EARLIEST_START_TIMESTAMP};
 use crate::Result;
 
-use crate::metadata::meta_field::MetaField;
-use crate::timeline::selector::InstantRange;
+use crate::config::read::HudiReadConfig;
 use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use std::collections::{HashMap, HashSet};
@@ -211,22 +209,18 @@ impl Table {
         Ok(Schema::new(partition_fields))
     }
 
-    /// Get all the [FileSlice]s in the table.
-    ///
-    /// The file slices are split into `n` chunks.
-    ///
-    /// If the [AsOfTimestamp] configuration is set, the file slices at the 
specified timestamp will be returned.
+    /// Get all the [FileSlice]s in splits from the table.
     ///
+    /// # Arguments
+    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `filters` - Partition filters to apply.
     pub async fn get_file_slices_splits(
         &self,
         n: usize,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<Vec<FileSlice>>> {
-        let filters = from_str_tuples(filters)?;
-        if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
-            self.get_file_slices_splits_internal(n, 
timestamp.to::<String>().as_str(), &filters)
-                .await
-        } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
+        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+            let filters = from_str_tuples(filters)?;
             self.get_file_slices_splits_internal(n, timestamp, &filters)
                 .await
         } else {
@@ -234,6 +228,12 @@ impl Table {
         }
     }
 
+    /// Get all the [FileSlice]s in splits from the table at a given timestamp.
+    ///
+    /// # Arguments
+    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `timestamp` - The timestamp which file slices associated with.
+    ///     * `filters` - Partition filters to apply.
     pub async fn get_file_slices_splits_as_of(
         &self,
         n: usize,
@@ -267,20 +267,28 @@ impl Table {
 
     /// Get all the [FileSlice]s in the table.
     ///
-    /// If the [AsOfTimestamp] configuration is set, the file slices at the 
specified timestamp will be returned.
+    /// # Arguments
+    ///     * `filters` - Partition filters to apply.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing snapshot query.
     pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<FileSlice>> {
-        let filters = from_str_tuples(filters)?;
-        if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
-            self.get_file_slices_internal(timestamp.to::<String>().as_str(), 
&filters)
-                .await
-        } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
+        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+            let filters = from_str_tuples(filters)?;
             self.get_file_slices_internal(timestamp, &filters).await
         } else {
             Ok(Vec::new())
         }
     }
 
-    /// Get all the [FileSlice]s at a given timestamp, as a time travel query.
+    /// Get all the [FileSlice]s in the table at a given timestamp.
+    ///
+    /// # Arguments
+    ///     * `timestamp` - The timestamp which file slices associated with.
+    ///     * `filters` - Partition filters to apply.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing time travel query.
     pub async fn get_file_slices_as_of(
         &self,
         timestamp: &str,
@@ -307,91 +315,124 @@ impl Table {
             .await
     }
 
-    pub fn create_file_group_reader(&self) -> FileGroupReader {
-        FileGroupReader::new(
-            self.hudi_configs.clone(),
-            self.file_system_view.storage.clone(),
-        )
+    /// Get all the changed [FileSlice]s in the table between the given 
timestamps.
+    ///
+    /// # Arguments
+    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing incremental query.
+    pub async fn get_file_slices_between(
+        &self,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<FileSlice>> {
+        // If the end timestamp is not provided, use the latest commit 
timestamp.
+        let Some(end) = end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp())
+        else {
+            // No latest commit timestamp means the table is empty.
+            return Ok(Vec::new());
+        };
+
+        let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+        self.get_file_slices_between_internal(start, end).await
     }
 
-    fn create_file_group_reader_with_filters(
+    async fn get_file_slices_between_internal(
         &self,
-        filters: &[(&str, &str, &str)],
-        schema: &Schema,
-    ) -> Result<FileGroupReader> {
-        let filters = from_str_tuples(filters)?;
-        FileGroupReader::new_with_filters(
-            self.file_system_view.storage.clone(),
+        start_timestamp: &str,
+        end_timestamp: &str,
+    ) -> Result<Vec<FileSlice>> {
+        let mut file_slices: Vec<FileSlice> = Vec::new();
+        let file_groups = self
+            .timeline
+            .get_file_groups_between(Some(start_timestamp), 
Some(end_timestamp))
+            .await?;
+        for file_group in file_groups {
+            if let Some(file_slice) = 
file_group.get_file_slice_as_of(end_timestamp) {
+                file_slices.push(file_slice.clone());
+            }
+        }
+
+        Ok(file_slices)
+    }
+
+    /// Create a [FileGroupReader] using the [Table]'s Hudi configs, and 
overwriting options.
+    pub fn create_file_group_reader_with_options<I, K, V>(
+        &self,
+        options: I,
+    ) -> Result<FileGroupReader>
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
+        let mut overwriting_options = 
HashMap::with_capacity(self.storage_options.len());
+        for (k, v) in self.storage_options.iter() {
+            overwriting_options.insert(k.clone(), v.clone());
+        }
+        for (k, v) in options {
+            overwriting_options.insert(k.as_ref().to_string(), v.into());
+        }
+        FileGroupReader::new_with_configs_and_options(
             self.hudi_configs.clone(),
-            &filters,
-            schema,
+            overwriting_options,
         )
     }
 
     /// Get all the latest records in the table.
     ///
-    /// If the [AsOfTimestamp] configuration is set, the records at the 
specified timestamp will be returned.
+    /// # Arguments
+    ///     * `filters` - Partition filters to apply.
     pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<RecordBatch>> {
-        let filters = from_str_tuples(filters)?;
-        let read_optimized_mode = self
-            .hudi_configs
-            .get_or_default(UseReadOptimizedMode)
-            .to::<bool>();
-
-        if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
-            self.read_snapshot_internal(
-                timestamp.to::<String>().as_str(),
-                &filters,
-                read_optimized_mode,
-            )
-            .await
-        } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
-            self.read_snapshot_internal(timestamp, &filters, 
read_optimized_mode)
-                .await
+        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+            let filters = from_str_tuples(filters)?;
+            self.read_snapshot_internal(timestamp, &filters).await
         } else {
             Ok(Vec::new())
         }
     }
 
-    /// Get all the records in the table at a given timestamp, as a time 
travel query.
+    /// Get all the records in the table at a given timestamp.
+    ///
+    /// # Arguments
+    ///     * `timestamp` - The timestamp which records associated with.
+    ///     * `filters` - Partition filters to apply.
     pub async fn read_snapshot_as_of(
         &self,
         timestamp: &str,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<RecordBatch>> {
         let filters = from_str_tuples(filters)?;
-        let read_optimized_mode = self
-            .hudi_configs
-            .get_or_default(UseReadOptimizedMode)
-            .to::<bool>();
-        self.read_snapshot_internal(timestamp, &filters, read_optimized_mode)
-            .await
+        self.read_snapshot_internal(timestamp, &filters).await
     }
 
     async fn read_snapshot_internal(
         &self,
         timestamp: &str,
         filters: &[Filter],
-        read_optimized_mode: bool,
     ) -> Result<Vec<RecordBatch>> {
         let file_slices = self.get_file_slices_internal(timestamp, 
filters).await?;
-        let fg_reader = self.create_file_group_reader();
-        let base_file_only =
-            read_optimized_mode || self.table_type() == 
TableTypeValue::CopyOnWrite;
-        let timezone = self.timezone();
-        let instant_range = InstantRange::up_to(timestamp, &timezone);
-        let batches = futures::future::try_join_all(
-            file_slices
-                .iter()
-                .map(|f| fg_reader.read_file_slice(f, base_file_only, 
instant_range.clone())),
-        )
-        .await?;
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+        let batches =
+            futures::future::try_join_all(file_slices.iter().map(|f| 
fg_reader.read_file_slice(f)))
+                .await?;
         Ok(batches)
     }
 
     /// Get records that were inserted or updated between the given timestamps.
+    ///
     /// Records that were updated multiple times should have their latest 
states within
     /// the time span being returned.
+    ///
+    /// # Arguments
+    ///     * `start_timestamp` - Only records that were inserted or updated 
after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only records that were inserted 
or updated before or at this timestamp will be returned.
     pub async fn read_incremental_records(
         &self,
         start_timestamp: &str,
@@ -404,41 +445,23 @@ impl Table {
             return Ok(Vec::new());
         };
 
-        // Use timestamp range to get the target file slices.
-        let mut file_slices: Vec<FileSlice> = Vec::new();
-        let file_groups = self
-            .timeline
-            .get_incremental_file_groups(Some(start_timestamp), 
Some(end_timestamp))
+        let file_slices = self
+            .get_file_slices_between_internal(start_timestamp, end_timestamp)
             .await?;
-        for file_group in file_groups {
-            if let Some(file_slice) = 
file_group.get_file_slice_as_of(end_timestamp) {
-                file_slices.push(file_slice.clone());
-            }
-        }
 
-        // Read incremental records from the file slices.
-        let filters = &[
-            (MetaField::CommitTime.as_ref(), ">", start_timestamp),
-            (MetaField::CommitTime.as_ref(), "<=", end_timestamp),
-        ];
-        let schema = MetaField::schema();
-        let fg_reader = self.create_file_group_reader_with_filters(filters, 
&schema)?;
-
-        // Read-optimized mode does not apply to incremental query semantics.
-        let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
-        let timezone = self.timezone();
-        let instant_range =
-            InstantRange::within_open_closed(start_timestamp, end_timestamp, 
&timezone);
-        let batches = futures::future::try_join_all(
-            file_slices
-                .iter()
-                .map(|f| fg_reader.read_file_slice(f, base_file_only, 
instant_range.clone())),
-        )
-        .await?;
+        let fg_reader = self.create_file_group_reader_with_options([
+            (HudiReadConfig::FileGroupStartTimestamp, start_timestamp),
+            (HudiReadConfig::FileGroupEndTimestamp, end_timestamp),
+        ])?;
+
+        let batches =
+            futures::future::try_join_all(file_slices.iter().map(|f| 
fg_reader.read_file_slice(f)))
+                .await?;
         Ok(batches)
     }
 
     /// Get the change-data-capture (CDC) records between the given timestamps.
+    ///
     /// The CDC records should reflect the records that were inserted, 
updated, and deleted
     /// between the timestamps.
     #[allow(dead_code)]
@@ -454,13 +477,13 @@ impl Table {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::config::read::HudiReadConfig::AsOfTimestamp;
     use crate::config::table::HudiTableConfig::{
         BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields, 
IsHiveStylePartitioning,
         IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields, 
PopulatesMetaFields,
         PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
         TimelineLayoutVersion, TimelineTimezone,
     };
+    use crate::config::util::empty_options;
     use crate::config::HUDI_CONF_DIR;
     use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
@@ -799,7 +822,8 @@ mod tests {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let batches = hudi_table
-            .create_file_group_reader()
+            .create_file_group_reader_with_options(empty_options())
+            .unwrap()
             .read_file_slice_by_base_file_path(
                 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
             )
@@ -830,7 +854,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn hudi_table_get_file_slices_splits_as_of() {
+    async fn hudi_table_get_file_slices_splits_as_of_timestamps() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
 
@@ -907,11 +931,13 @@ mod tests {
         );
 
         // as of just smaller than the latest timestamp
-        let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
-        let hudi_table = Table::new_with_options(base_url.path(), opts)
+        let hudi_table = Table::new_with_options(base_url.path(), 
empty_options())
+            .await
+            .unwrap();
+        let file_slices = hudi_table
+            .get_file_slices_as_of("20240418173551905", &[])
             .await
             .unwrap();
-        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
         assert_eq!(
             file_slices
                 .iter()
@@ -921,11 +947,13 @@ mod tests {
         );
 
         // as of non-exist old timestamp
-        let opts = [(AsOfTimestamp.as_ref(), "19700101000000")];
-        let hudi_table = Table::new_with_options(base_url.path(), opts)
+        let hudi_table = Table::new_with_options(base_url.path(), 
empty_options())
+            .await
+            .unwrap();
+        let file_slices = hudi_table
+            .get_file_slices_as_of("19700101000000", &[])
             .await
             .unwrap();
-        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
         assert_eq!(
             file_slices
                 .iter()
@@ -935,6 +963,29 @@ mod tests {
         );
     }
 
+    #[tokio::test]
+    async fn empty_hudi_table_get_file_slices_between_timestamps() {
+        let base_url = SampleTable::V6Empty.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices = hudi_table
+            .get_file_slices_between(Some(EARLIEST_START_TIMESTAMP), None)
+            .await
+            .unwrap();
+        assert!(file_slices.is_empty())
+    }
+
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_between_timestamps() {
+        let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices = hudi_table
+            .get_file_slices_between(None, Some("20250121000656060"))
+            .await
+            .unwrap();
+        assert_eq!(file_slices.len(), 3);
+        // TODO: Add more assertions
+    }
+
     #[tokio::test]
     async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
         let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
@@ -1071,9 +1122,11 @@ mod tests {
         #[tokio::test]
         async fn test_non_partitioned_read_optimized() -> Result<()> {
             let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
-            let hudi_table =
-                Table::new_with_options(base_url.path(), 
[(UseReadOptimizedMode.as_ref(), "true")])
-                    .await?;
+            let hudi_table = Table::new_with_options(
+                base_url.path(),
+                [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
+            )
+            .await?;
             let commit_timestamps = hudi_table
                 .timeline
                 .completed_commits
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index dccd08a..9722393 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -45,9 +45,11 @@ pub struct Timeline {
     pub completed_commits: Vec<Instant>,
 }
 
+pub const EARLIEST_START_TIMESTAMP: &str = "19700101000000000";
+
 impl Timeline {
     #[cfg(test)]
-    pub async fn new_from_completed_commits(
+    pub(crate) async fn new_from_completed_commits(
         hudi_configs: Arc<HudiConfigs>,
         storage_options: Arc<HashMap<String, String>>,
         completed_commits: Vec<Instant>,
@@ -60,7 +62,7 @@ impl Timeline {
         })
     }
 
-    pub async fn new_from_storage(
+    pub(crate) async fn new_from_storage(
         hudi_configs: Arc<HudiConfigs>,
         storage_options: Arc<HashMap<String, String>>,
     ) -> Result<Self> {
@@ -104,7 +106,7 @@ impl Timeline {
         Ok(instants)
     }
 
-    pub fn get_latest_commit_timestamp(&self) -> Option<&str> {
+    pub(crate) fn get_latest_commit_timestamp(&self) -> Option<&str> {
         self.completed_commits
             .iter()
             .next_back()
@@ -126,7 +128,7 @@ impl Timeline {
             .map_err(|e| CoreError::Timeline(format!("Failed to get commit 
metadata: {}", e)))
     }
 
-    pub async fn get_latest_schema(&self) -> Result<Schema> {
+    pub(crate) async fn get_latest_schema(&self) -> Result<Schema> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
 
         let first_partition = commit_metadata
@@ -181,7 +183,7 @@ impl Timeline {
         }
     }
 
-    pub async fn get_replaced_file_groups_as_of(
+    pub(crate) async fn get_replaced_file_groups_as_of(
         &self,
         timestamp: &str,
     ) -> Result<HashSet<FileGroup>> {
@@ -203,7 +205,7 @@ impl Timeline {
 
     /// Get file groups in the timeline ranging from start (exclusive) to end 
(inclusive).
     /// File groups are as of the [end] timestamp or the latest if not given.
-    pub async fn get_incremental_file_groups(
+    pub(crate) async fn get_file_groups_between(
         &self,
         start_timestamp: Option<&str>,
         end_timestamp: Option<&str>,
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 50a7a05..094c41c 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -67,7 +67,7 @@ use hudi_core::util::StrTupleRef;
 /// // Create a new HudiDataSource with specific read options
 /// let hudi = HudiDataSource::new_with_options(
 ///     "/tmp/trips_table",
-///     [("hoodie.read.as.of.timestamp", "20241122010827898")]).await?;
+///     [("hoodie.read.input.partitions", 5)]).await?;
 ///
 /// // Register the Hudi table with the session context
 /// ctx.register_table("trips_table", Arc::new(hudi))?;
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index d6f6623..a884265 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -24,11 +24,11 @@ __version__: str
 @dataclass(init=False)
 class HudiFileGroupReader:
     """
-    A reader for a group of Hudi file slices. Allows reading of records from 
the base file in a Hudi table.
+    The reader that handles all read operations against a file group.
 
     Attributes:
-        base_uri (str): The base URI of the Hudi table.
-        options (Optional[Dict[str, str]]): Additional options for reading the 
file group.
+        base_uri (str): The base URI of the file group's residing table.
+        options (Optional[Dict[str, str]]): Additional options for the reader.
     """
     def __init__(self, base_uri: str, options: Optional[Dict[str, str]] = 
None):
         """
@@ -43,13 +43,24 @@ class HudiFileGroupReader:
         self, relative_path: str
     ) -> "pyarrow.RecordBatch":
         """
-        Reads the data from the base file given a relative path.
+        Reads the data from the base file at the given relative path.
 
         Parameters:
             relative_path (str): The relative path to the base file.
 
         Returns:
-            pyarrow.RecordBatch: A batch of records read from the base file.
+            pyarrow.RecordBatch: A record batch read from the base file.
+        """
+        ...
+    def read_file_slice(self, file_slice: HudiFileSlice) -> 
"pyarrow.RecordBatch":
+        """
+        Reads the data from the given file slice.
+
+        Parameters:
+            file_slice (HudiFileSlice): The file slice to read from.
+
+        Returns:
+            pyarrow.RecordBatch: A record batch read from the file slice.
         """
         ...
 
@@ -66,6 +77,7 @@ class HudiFileSlice:
         base_file_name (str): The name of the base file.
         base_file_size (int): The on-disk size of the base file in bytes.
         base_file_byte_size (int): The in-memory size of the base file in 
bytes.
+        log_file_names (List[str]): The names of the ordered log files.
         num_records (int): The number of records in the file slice.
     """
 
@@ -75,6 +87,7 @@ class HudiFileSlice:
     base_file_name: str
     base_file_size: int
     base_file_byte_size: int
+    log_file_names: List[str]
     num_records: int
 
     def base_file_relative_path(self) -> str:
@@ -85,6 +98,14 @@ class HudiFileSlice:
             str: The relative path of the base file.
         """
         ...
+    def log_files_relative_paths(self) -> List[str]:
+        """
+        Returns the relative paths of the log files for this file slice.
+
+        Returns:
+            List[str]: A list of relative paths of the log files.
+        """
+        ...
 
 @dataclass(init=False)
 class HudiTable:
@@ -182,7 +203,18 @@ class HudiTable:
         Retrieves all file slices in the Hudi table as of a timestamp, 
optionally filtered by the provided filters.
         """
         ...
-    def create_file_group_reader(self) -> HudiFileGroupReader:
+    def get_file_slices_between(
+        self,
+        start_timestamp: Optional[str],
+        end_timestamp: Optional[str],
+    ) -> List[HudiFileSlice]:
+        """
+        Retrieves all changed file slices in the Hudi table between the given 
timestamps.
+        """
+        ...
+    def create_file_group_reader_with_options(
+        self, options: Optional[Dict[str, str]] = None
+    ) -> HudiFileGroupReader:
         """
         Creates a HudiFileGroupReader for reading records from file groups in 
the Hudi table.
 
diff --git a/python/src/internal.rs b/python/src/internal.rs
index b09ea12..6ee9a01 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -27,6 +27,7 @@ use tokio::runtime::Runtime;
 use hudi::error::CoreError;
 use hudi::file_group::file_slice::FileSlice;
 use hudi::file_group::reader::FileGroupReader;
+use hudi::file_group::FileGroup;
 use hudi::storage::error::StorageError;
 use hudi::table::builder::TableBuilder;
 use hudi::table::Table;
@@ -82,6 +83,34 @@ impl HudiFileGroupReader {
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
+    fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> 
PyResult<PyObject> {
+        let mut file_group = FileGroup::new(
+            file_slice.file_id.clone(),
+            file_slice.partition_path.clone(),
+        );
+        file_group
+            .add_base_file_from_name(&file_slice.base_file_name)
+            .map_err(PythonError::from)?;
+        for name in file_slice.log_file_names.iter() {
+            file_group
+                .add_log_file_from_name(name)
+                .map_err(PythonError::from)?;
+        }
+        let (_, inner_file_slice) = file_group
+            .file_slices
+            .iter()
+            .next()
+            .ok_or_else(|| {
+                CoreError::FileGroup(format!(
+                    "Failed to get file slice from file group: {:?}",
+                    file_group
+                ))
+            })
+            .map_err(PythonError::from)?;
+        rt().block_on(self.inner.read_file_slice(inner_file_slice))
+            .map_err(PythonError::from)?
+            .to_pyarrow(py)
+    }
 }
 
 #[cfg(not(tarpaulin))]
@@ -101,6 +130,8 @@ pub struct HudiFileSlice {
     #[pyo3(get)]
     base_file_byte_size: i64,
     #[pyo3(get)]
+    log_file_names: Vec<String>,
+    #[pyo3(get)]
     num_records: i64,
 }
 
@@ -112,34 +143,59 @@ impl HudiFileSlice {
             .join(&self.base_file_name)
             .to_str()
             .map(String::from)
-            .ok_or(StorageError::InvalidPath(format!(
-                "Failed to get base file relative path for file slice: {:?}",
-                self
-            )))
+            .ok_or_else(|| {
+                StorageError::InvalidPath(format!(
+                    "Failed to get base file relative path for file slice: 
{:?}",
+                    self
+                ))
+            })
             .map_err(CoreError::from)
             .map_err(PythonError::from)?;
         Ok(path)
     }
+    fn log_files_relative_paths(&self) -> PyResult<Vec<String>> {
+        let mut paths = Vec::<String>::new();
+        for name in self.log_file_names.iter() {
+            let p = PathBuf::from(&self.partition_path)
+                .join(name)
+                .to_str()
+                .map(String::from)
+                .ok_or_else(|| {
+                    StorageError::InvalidPath(format!(
+                        "Failed to get log file relative path for file slice: 
{:?}",
+                        self
+                    ))
+                })
+                .map_err(CoreError::from)
+                .map_err(PythonError::from)?;
+            paths.push(p)
+        }
+        Ok(paths)
+    }
 }
 
 #[cfg(not(tarpaulin))]
-fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
-    let file_id = f.file_id().to_string();
-    let partition_path = f.partition_path.to_string();
-    let creation_instant_time = f.creation_instant_time().to_string();
-    let base_file_name = f.base_file.file_name();
-    let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();
-    let base_file_size = file_metadata.size;
-    let base_file_byte_size = file_metadata.byte_size;
-    let num_records = file_metadata.num_records;
-    HudiFileSlice {
-        file_id,
-        partition_path,
-        creation_instant_time,
-        base_file_name,
-        base_file_size,
-        base_file_byte_size,
-        num_records,
+impl From<&FileSlice> for HudiFileSlice {
+    fn from(f: &FileSlice) -> Self {
+        let file_id = f.file_id().to_string();
+        let partition_path = f.partition_path.to_string();
+        let creation_instant_time = f.creation_instant_time().to_string();
+        let base_file_name = f.base_file.file_name();
+        let file_metadata = 
f.base_file.file_metadata.clone().unwrap_or_default();
+        let base_file_size = file_metadata.size;
+        let base_file_byte_size = file_metadata.byte_size;
+        let log_file_names = f.log_files.iter().map(|l| 
l.file_name()).collect();
+        let num_records = file_metadata.num_records;
+        HudiFileSlice {
+            file_id,
+            partition_path,
+            creation_instant_time,
+            base_file_name,
+            base_file_size,
+            base_file_byte_size,
+            log_file_names,
+            num_records,
+        }
     }
 }
 
@@ -202,7 +258,7 @@ impl HudiTable {
                 .map_err(PythonError::from)?;
             Ok(file_slices
                 .iter()
-                .map(|inner_vec| 
inner_vec.iter().map(convert_file_slice).collect())
+                .map(|inner_vec| 
inner_vec.iter().map(HudiFileSlice::from).collect())
                 .collect())
         })
     }
@@ -226,7 +282,7 @@ impl HudiTable {
                 .map_err(PythonError::from)?;
             Ok(file_slices
                 .iter()
-                .map(|inner_vec| 
inner_vec.iter().map(convert_file_slice).collect())
+                .map(|inner_vec| 
inner_vec.iter().map(HudiFileSlice::from).collect())
                 .collect())
         })
     }
@@ -243,7 +299,7 @@ impl HudiTable {
             let file_slices = rt()
                 .block_on(self.inner.get_file_slices(&filters.as_strs()))
                 .map_err(PythonError::from)?;
-            Ok(file_slices.iter().map(convert_file_slice).collect())
+            Ok(file_slices.iter().map(HudiFileSlice::from).collect())
         })
     }
 
@@ -263,12 +319,37 @@ impl HudiTable {
                         .get_file_slices_as_of(timestamp, &filters.as_strs()),
                 )
                 .map_err(PythonError::from)?;
-            Ok(file_slices.iter().map(convert_file_slice).collect())
+            Ok(file_slices.iter().map(HudiFileSlice::from).collect())
+        })
+    }
+
+    #[pyo3(signature = (start_timestamp=None, end_timestamp=None))]
+    fn get_file_slices_between(
+        &self,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+        py: Python,
+    ) -> PyResult<Vec<HudiFileSlice>> {
+        py.allow_threads(|| {
+            let file_slices = rt()
+                .block_on(
+                    self.inner
+                        .get_file_slices_between(start_timestamp, 
end_timestamp),
+                )
+                .map_err(PythonError::from)?;
+            Ok(file_slices.iter().map(HudiFileSlice::from).collect())
         })
     }
 
-    fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
-        let fg_reader = self.inner.create_file_group_reader();
+    #[pyo3(signature = (options=None))]
+    fn create_file_group_reader_with_options(
+        &self,
+        options: Option<HashMap<String, String>>,
+    ) -> PyResult<HudiFileGroupReader> {
+        let fg_reader = self
+            .inner
+            .create_file_group_reader_with_options(options.unwrap_or_default())
+            .map_err(PythonError::from)?;
         Ok(HudiFileGroupReader { inner: fg_reader })
     }
 
diff --git a/python/tests/test_table_builder.py 
b/python/tests/test_table_builder.py
index 1ae5099..6a0cb22 100644
--- a/python/tests/test_table_builder.py
+++ b/python/tests/test_table_builder.py
@@ -113,11 +113,19 @@ def 
test_read_table_returns_correct_data(get_sample_table):
 @pytest.mark.parametrize(
     "hudi_options,storage_options,options",
     [
-        ({"hoodie.read.as.of.timestamp": "20240402123035233"}, {}, {}),
-        ({}, {}, {"hoodie.read.as.of.timestamp": "20240402123035233"}),
+        (
+            {"hoodie.read.file_group.start_timestamp": "resolved value"},
+            {"hoodie.read.file_group.start_timestamp": "not taking"},
+            {"hoodie.read.file_group.start_timestamp": "lower precedence"},
+        ),
+        (
+            {},
+            {"hoodie.read.file_group.start_timestamp": "not taking"},
+            {"hoodie.read.file_group.start_timestamp": "resolved value"},
+        ),
     ],
 )
-def test_read_table_as_of_timestamp(
+def test_setting_table_options(
     get_sample_table, hudi_options, storage_options, options
 ):
     table_path = get_sample_table
@@ -129,37 +137,7 @@ def test_read_table_as_of_timestamp(
         .build()
     )
 
-    batches = table.read_snapshot()
-    t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
-    assert t.to_pylist() == [
-        {
-            "_hoodie_commit_time": "20240402123035233",
-            "ts": 1695046462179,
-            "uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00",
-            "fare": 33.9,
-        },
-        {
-            "_hoodie_commit_time": "20240402123035233",
-            "ts": 1695091554788,
-            "uuid": "e96c4396-3fad-413a-a942-4cb36106d721",
-            "fare": 27.7,
-        },
-        {
-            "_hoodie_commit_time": "20240402123035233",
-            "ts": 1695115999911,
-            "uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa",
-            "fare": 17.85,
-        },
-        {
-            "_hoodie_commit_time": "20240402123035233",
-            "ts": 1695159649087,
-            "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330",
-            "fare": 19.1,
-        },
-        {
-            "_hoodie_commit_time": "20240402123035233",
-            "ts": 1695516137016,
-            "uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c",
-            "fare": 34.15,
-        },
-    ]
+    assert (
+        table.hudi_options().get("hoodie.read.file_group.start_timestamp")
+        == "resolved value"
+    )
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index c2931aa..2dca24d 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -75,8 +75,10 @@ def test_read_table_can_read_from_batches(get_sample_table):
 
     file_slices = table.get_file_slices()
     file_slice_paths = [f.base_file_relative_path() for f in file_slices]
-    batch = table.create_file_group_reader().read_file_slice_by_base_file_path(
-        file_slice_paths[0]
+    batch = (
+        
table.create_file_group_reader_with_options().read_file_slice_by_base_file_path(
+            file_slice_paths[0]
+        )
     )
     t = pa.Table.from_batches([batch])
     assert t.num_rows == 1

Reply via email to