This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 493697b  feat: support time travel query for MOR tables (#256)
493697b is described below

commit 493697b3c1f22a96d846b924883b1e3b5e5683f1
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 21 20:07:36 2025 -0600

    feat: support time travel query for MOR tables (#256)
    
    - Add `InstantRange` to support filter reading log blocks
    - Adjust file group reader APIs to accept `InstantRange` for filtering
---
 crates/core/src/file_group/log_file/log_block.rs   |   1 +
 crates/core/src/file_group/log_file/reader.rs      |  60 +++++-
 crates/core/src/file_group/reader.rs               |  18 +-
 crates/core/src/table/mod.rs                       | 158 ++++++---------
 crates/core/src/timeline/mod.rs                    |   6 +-
 crates/core/src/timeline/selector.rs               | 215 ++++++++++++++++++++-
 .../tables/mor/v6_simplekeygen_nonhivestyle.sql    |  93 +++++++++
 .../tables/mor/v6_simplekeygen_nonhivestyle.zip    | Bin 0 -> 37571 bytes
 crates/tests/src/lib.rs                            |  42 +++-
 9 files changed, 474 insertions(+), 119 deletions(-)

diff --git a/crates/core/src/file_group/log_file/log_block.rs 
b/crates/core/src/file_group/log_file/log_block.rs
index ab59a40..e1b685d 100644
--- a/crates/core/src/file_group/log_file/log_block.rs
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -137,6 +137,7 @@ pub struct LogBlock {
     pub header: HashMap<BlockMetadataKey, String>,
     pub record_batches: Vec<RecordBatch>,
     pub footer: HashMap<BlockMetadataKey, String>,
+    pub skipped: bool,
 }
 
 impl LogBlock {
diff --git a/crates/core/src/file_group/log_file/reader.rs 
b/crates/core/src/file_group/log_file/reader.rs
index 8a0d439..07a990d 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
 use crate::error::CoreError;
 use crate::file_group::log_file::log_block::{
     BlockMetadataKey, BlockMetadataType, BlockType, LogBlock,
@@ -24,6 +26,7 @@ use crate::file_group::log_file::log_block::{
 use crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
 use crate::storage::reader::StorageReader;
 use crate::storage::Storage;
+use crate::timeline::selector::InstantRange;
 use crate::Result;
 use arrow_array::RecordBatch;
 use bytes::BytesMut;
@@ -36,31 +39,48 @@ pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024 * 1024;
 #[allow(dead_code)]
 #[derive(Debug)]
 pub struct LogFileReader<R> {
+    hudi_configs: Arc<HudiConfigs>,
     storage: Arc<Storage>,
     reader: R,
     buffer: BytesMut,
+    timezone: String,
 }
 
 impl LogFileReader<StorageReader> {
-    pub async fn new(storage: Arc<Storage>, relative_path: &str) -> 
Result<Self> {
+    pub async fn new(
+        hudi_configs: Arc<HudiConfigs>,
+        storage: Arc<Storage>,
+        relative_path: &str,
+    ) -> Result<Self> {
         let reader = storage.get_storage_reader(relative_path).await?;
+        let timezone = hudi_configs
+            .get_or_default(HudiTableConfig::TimelineTimezone)
+            .to::<String>();
         Ok(Self {
+            hudi_configs,
             storage,
             reader,
             buffer: BytesMut::with_capacity(DEFAULT_BUFFER_SIZE),
+            timezone,
         })
     }
 
-    fn read_all_blocks(&mut self) -> Result<Vec<LogBlock>> {
+    fn read_all_blocks(&mut self, instant_range: &InstantRange) -> 
Result<Vec<LogBlock>> {
         let mut blocks = Vec::new();
-        while let Some(block) = self.read_next_block()? {
+        while let Some(block) = self.read_next_block(instant_range)? {
+            if block.skipped {
+                continue;
+            }
             blocks.push(block);
         }
         Ok(blocks)
     }
 
-    pub fn read_all_records_unmerged(&mut self) -> Result<Vec<RecordBatch>> {
-        let all_blocks = self.read_all_blocks()?;
+    pub fn read_all_records_unmerged(
+        &mut self,
+        instant_range: &InstantRange,
+    ) -> Result<Vec<RecordBatch>> {
+        let all_blocks = self.read_all_blocks(instant_range)?;
         let mut batches = Vec::new();
         for block in all_blocks {
             batches.extend_from_slice(&block.record_batches);
@@ -217,7 +237,21 @@ impl<R: Read + Seek> LogFileReader<R> {
         Ok(Some(u64::from_be_bytes(size_buf)))
     }
 
-    fn read_next_block(&mut self) -> Result<Option<LogBlock>> {
+    fn should_skip_block(
+        &self,
+        header: &HashMap<BlockMetadataKey, String>,
+        instant_range: &InstantRange,
+    ) -> Result<bool> {
+        let instant_time =
+            header
+                .get(&BlockMetadataKey::InstantTime)
+                .ok_or(CoreError::LogFormatError(
+                    "Instant time not found".to_string(),
+                ))?;
+        instant_range.not_in_range(instant_time, &self.timezone)
+    }
+
+    fn read_next_block(&mut self, instant_range: &InstantRange) -> 
Result<Option<LogBlock>> {
         if !self.read_magic()? {
             return Ok(None);
         }
@@ -231,6 +265,11 @@ impl<R: Read + Seek> LogFileReader<R> {
         let format_version = self.read_format_version()?;
         let block_type = self.read_block_type(&format_version)?;
         let header = self.read_block_metadata(BlockMetadataType::Header, 
&format_version)?;
+        let mut skipped = false;
+        if self.should_skip_block(&header, instant_range)? {
+            skipped = true;
+            // TODO skip reading block
+        }
         let content = self.read_content(&format_version, block_length)?;
         let record_batches = LogBlock::decode_content(&block_type, content)?;
         let footer = self.read_block_metadata(BlockMetadataType::Footer, 
&format_version)?;
@@ -242,6 +281,7 @@ impl<R: Read + Seek> LogFileReader<R> {
             header,
             record_batches,
             footer,
+            skipped,
         }))
     }
 }
@@ -265,9 +305,13 @@ mod tests {
     async fn test_read_sample_log_file() {
         let (dir, file_name) = get_sample_log_file();
         let dir_url = Url::from_directory_path(dir).unwrap();
+        let hudi_configs = Arc::new(HudiConfigs::empty());
         let storage = Storage::new_with_base_url(dir_url).unwrap();
-        let mut reader = LogFileReader::new(storage, 
&file_name).await.unwrap();
-        let blocks = reader.read_all_blocks().unwrap();
+        let mut reader = LogFileReader::new(hudi_configs, storage, &file_name)
+            .await
+            .unwrap();
+        let instant_range = InstantRange::up_to("20250113230424191", "utc");
+        let blocks = reader.read_all_blocks(&instant_range).unwrap();
         assert_eq!(blocks.len(), 1);
 
         let block = &blocks[0];
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 387b591..ce2ae1c 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -32,18 +32,19 @@ use std::sync::Arc;
 
 use crate::file_group::log_file::reader::LogFileReader;
 use crate::merge::record_merger::RecordMerger;
+use crate::timeline::selector::InstantRange;
 use arrow::compute::filter_record_batch;
 
 /// File group reader handles all read operations against a file group.
 #[derive(Clone, Debug)]
 pub struct FileGroupReader {
-    storage: Arc<Storage>,
     hudi_configs: Arc<HudiConfigs>,
+    storage: Arc<Storage>,
     and_filters: Vec<SchemableFilter>,
 }
 
 impl FileGroupReader {
-    pub fn new(storage: Arc<Storage>, hudi_configs: Arc<HudiConfigs>) -> Self {
+    pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
         Self {
             storage,
             hudi_configs,
@@ -84,7 +85,7 @@ impl FileGroupReader {
         let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
 
         let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
-        Ok(Self::new(storage, hudi_configs))
+        Ok(Self::new(hudi_configs, storage))
     }
 
     fn create_boolean_array_mask(&self, records: &RecordBatch) -> 
Result<BooleanArray> {
@@ -124,6 +125,7 @@ impl FileGroupReader {
         &self,
         file_slice: &FileSlice,
         base_file_only: bool,
+        instant_range: InstantRange,
     ) -> Result<RecordBatch> {
         let relative_path = file_slice.base_file_relative_path()?;
         if base_file_only {
@@ -138,9 +140,11 @@ impl FileGroupReader {
 
             for log_file in &file_slice.log_files {
                 let relative_path = 
file_slice.log_file_relative_path(log_file)?;
+                let hudi_configs = self.hudi_configs.clone();
                 let storage = self.storage.clone();
-                let mut log_file_reader = LogFileReader::new(storage, 
&relative_path).await?;
-                let log_file_records = 
log_file_reader.read_all_records_unmerged()?;
+                let mut log_file_reader =
+                    LogFileReader::new(hudi_configs, storage, 
&relative_path).await?;
+                let log_file_records = 
log_file_reader.read_all_records_unmerged(&instant_range)?;
                 all_records.extend_from_slice(&log_file_records);
             }
 
@@ -165,7 +169,7 @@ mod tests {
     fn test_new() {
         let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
         let storage = Storage::new_with_base_url(base_url).unwrap();
-        let fg_reader = FileGroupReader::new(storage.clone(), 
Arc::from(HudiConfigs::empty()));
+        let fg_reader = FileGroupReader::new(Arc::from(HudiConfigs::empty()), 
storage.clone());
         assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
     }
 
@@ -237,7 +241,7 @@ mod tests {
             
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
                 .unwrap();
         let empty_configs = Arc::new(HudiConfigs::empty());
-        let reader = FileGroupReader::new(storage, empty_configs.clone());
+        let reader = FileGroupReader::new(empty_configs.clone(), storage);
         let result = reader
             .read_file_slice_by_base_file_path("non_existent_file")
             .await;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4c92faf..31e25e0 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -105,6 +105,7 @@ use crate::timeline::Timeline;
 use crate::Result;
 
 use crate::metadata::meta_field::MetaField;
+use crate::timeline::selector::InstantRange;
 use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use std::collections::{HashMap, HashSet};
@@ -147,6 +148,13 @@ impl Table {
             .map_err(CoreError::from)
     }
 
+    #[inline]
+    pub fn timezone(&self) -> String {
+        self.hudi_configs
+            .get_or_default(HudiTableConfig::TimelineTimezone)
+            .to::<String>()
+    }
+
     pub fn hudi_options(&self) -> HashMap<String, String> {
         self.hudi_configs.as_options()
     }
@@ -259,8 +267,8 @@ impl Table {
 
     pub fn create_file_group_reader(&self) -> FileGroupReader {
         FileGroupReader::new(
-            self.file_system_view.storage.clone(),
             self.hudi_configs.clone(),
+            self.file_system_view.storage.clone(),
         )
     }
 
@@ -300,10 +308,12 @@ impl Table {
         let file_slices = self.get_file_slices_as_of(timestamp, 
filters).await?;
         let fg_reader = self.create_file_group_reader();
         let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
+        let timezone = self.timezone();
+        let instant_range = InstantRange::up_to(timestamp, &timezone);
         let batches = futures::future::try_join_all(
             file_slices
                 .iter()
-                .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+                .map(|f| fg_reader.read_file_slice(f, base_file_only, 
instant_range.clone())),
         )
         .await?;
         Ok(batches)
@@ -342,10 +352,12 @@ impl Table {
         let fg_reader =
             self.create_file_group_reader_with_filters(filters, 
MetaField::schema().as_ref())?;
         let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
+        let timezone = self.timezone();
+        let instant_range = InstantRange::up_to(as_of_timestamp, &timezone);
         let batches = futures::future::try_join_all(
             file_slices
                 .iter()
-                .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+                .map(|f| fg_reader.read_file_slice(f, base_file_only, 
instant_range.clone())),
         )
         .await?;
         Ok(batches)
@@ -896,19 +908,14 @@ mod tests {
         assert_eq!(actual, expected);
     }
 
-    mod test_snapshot_queries {
+    mod test_snapshot_and_time_travel_queries {
         use super::super::*;
-        use crate::metadata::meta_field::MetaField;
-        use arrow_array::{Array, BooleanArray, Int32Array, StringArray};
+        use arrow::compute::concat_batches;
         use hudi_tests::SampleTable;
 
         #[tokio::test]
         async fn test_empty() -> Result<()> {
-            let base_urls = [
-                SampleTable::V6Empty.url_to_cow(),
-                SampleTable::V6Empty.url_to_mor(),
-            ];
-            for base_url in base_urls.iter() {
+            for base_url in SampleTable::V6Empty.urls() {
                 let hudi_table = Table::new(base_url.path()).await?;
                 let records = hudi_table.read_snapshot(&[]).await?;
                 assert!(records.is_empty());
@@ -918,110 +925,67 @@ mod tests {
 
         #[tokio::test]
         async fn test_non_partitioned() -> Result<()> {
-            let base_urls = [
-                SampleTable::V6Nonpartitioned.url_to_cow(),
-                SampleTable::V6Nonpartitioned.url_to_mor(),
-            ];
-            for base_url in base_urls.iter() {
+            for base_url in SampleTable::V6Nonpartitioned.urls() {
                 let hudi_table = Table::new(base_url.path()).await?;
                 let records = hudi_table.read_snapshot(&[]).await?;
-                let all_records = 
arrow::compute::concat_batches(&records[0].schema(), &records)?;
-
-                let ids = all_records
-                    .column_by_name("id")
-                    .unwrap()
-                    .as_any()
-                    .downcast_ref::<Int32Array>()
-                    .unwrap();
-                let names = all_records
-                    .column_by_name("name")
-                    .unwrap()
-                    .as_any()
-                    .downcast_ref::<StringArray>()
-                    .unwrap();
-                let is_actives = all_records
-                    .column_by_name("isActive")
-                    .unwrap()
-                    .as_any()
-                    .downcast_ref::<BooleanArray>()
-                    .unwrap();
+                let schema = &records[0].schema();
+                let records = concat_batches(schema, &records)?;
 
-                let mut data: Vec<(i32, &str, bool)> = ids
-                    .iter()
-                    .zip(names.iter())
-                    .zip(is_actives.iter())
-                    .map(|((id, name), is_active)| (id.unwrap(), 
name.unwrap(), is_active.unwrap()))
-                    .collect();
-                data.sort_unstable_by_key(|(id, _, _)| *id);
+                let sample_data = 
SampleTable::sample_data_order_by_id(&records);
                 assert_eq!(
-                    data,
+                    sample_data,
                     vec![
                         (1, "Alice", false),
                         (2, "Bob", false),
                         (3, "Carol", true),
                         (4, "Diana", true),
                     ]
-                )
+                );
             }
             Ok(())
         }
 
         #[tokio::test]
         async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
-            let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_mor();
-            let hudi_table = Table::new(base_url.path()).await?;
-
-            let filters = &[
-                Filter::try_from(("byteField", ">=", "10"))?,
-                Filter::try_from(("byteField", "<", "20"))?,
-                Filter::try_from(("shortField", "!=", "100"))?,
-            ];
-            let records = hudi_table.read_snapshot(filters).await?;
-            let all_records = 
arrow::compute::concat_batches(&records[0].schema(), &records)?;
-
-            let partition_paths = all_records
-                .column_by_name(MetaField::PartitionPath.as_ref())
-                .unwrap()
-                .as_any()
-                .downcast_ref::<StringArray>()
-                .unwrap();
-            let ids = all_records
-                .column_by_name("id")
-                .unwrap()
-                .as_any()
-                .downcast_ref::<Int32Array>()
-                .unwrap();
-            let names = all_records
-                .column_by_name("name")
-                .unwrap()
-                .as_any()
-                .downcast_ref::<StringArray>()
-                .unwrap();
-            let is_actives = all_records
-                .column_by_name("isActive")
-                .unwrap()
-                .as_any()
-                .downcast_ref::<BooleanArray>()
-                .unwrap();
+            for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
+                let hudi_table = Table::new(base_url.path()).await?;
 
-            let mut data: Vec<(&str, i32, &str, bool)> = partition_paths
-                .iter()
-                .zip(ids.iter())
-                .zip(names.iter())
-                .zip(is_actives.iter())
-                .map(|(((pt, id), name), is_active)| {
-                    (pt.unwrap(), id.unwrap(), name.unwrap(), 
is_active.unwrap())
-                })
-                .collect();
-            data.sort_unstable_by_key(|(_, id, _, _)| *id);
-            assert_eq!(
-                data,
-                vec![
-                    ("byteField=10/shortField=300", 1, "Alice", false),
-                    ("byteField=10/shortField=300", 3, "Carol", true),
-                ]
-            );
+                let filters = &[
+                    Filter::try_from(("byteField", ">=", "10"))?,
+                    Filter::try_from(("byteField", "<", "20"))?,
+                    Filter::try_from(("shortField", "!=", "100"))?,
+                ];
+                let records = hudi_table.read_snapshot(filters).await?;
+                let schema = &records[0].schema();
+                let records = concat_batches(schema, &records)?;
+
+                let sample_data = 
SampleTable::sample_data_order_by_id(&records);
+                assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol", 
true),]);
+            }
+            Ok(())
+        }
 
+        #[tokio::test]
+        async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
+            for base_url in 
&[SampleTable::V6SimplekeygenNonhivestyle.url_to_mor()] {
+                let hudi_table = Table::new(base_url.path()).await?;
+                let commit_timestamps = hudi_table
+                    .timeline
+                    .completed_commits
+                    .iter()
+                    .map(|i| i.timestamp.as_str())
+                    .collect::<Vec<_>>();
+                let first_commit = commit_timestamps[0];
+                let records = hudi_table.read_snapshot_as_of(first_commit, 
&[]).await?;
+                let schema = &records[0].schema();
+                let records = concat_batches(schema, &records)?;
+
+                let sample_data = 
SampleTable::sample_data_order_by_id(&records);
+                assert_eq!(
+                    sample_data,
+                    vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", 
true),]
+                );
+            }
             Ok(())
         }
     }
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index e1fc486..262d05a 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -17,7 +17,7 @@
  * under the License.
  */
 mod instant;
-mod selector;
+pub(crate) mod selector;
 
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
@@ -54,8 +54,8 @@ impl Timeline {
     ) -> Result<Self> {
         let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
         Ok(Self {
-            storage,
             hudi_configs,
+            storage,
             completed_commits,
         })
     }
@@ -68,8 +68,8 @@ impl Timeline {
         let selector = 
TimelineSelector::completed_commits(hudi_configs.clone())?;
         let completed_commits = Self::load_instants(&selector, 
&storage).await?;
         Ok(Self {
-            storage,
             hudi_configs,
+            storage,
             completed_commits,
         })
     }
diff --git a/crates/core/src/timeline/selector.rs 
b/crates/core/src/timeline/selector.rs
index e49e374..ff117a7 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -25,6 +25,91 @@ use crate::Result;
 use chrono::{DateTime, Utc};
 use std::sync::Arc;
 
+#[derive(Debug, Clone)]
+pub struct InstantRange {
+    timezone: String,
+    start_timestamp: Option<String>,
+    end_timestamp: Option<String>,
+    start_inclusive: bool,
+    end_inclusive: bool,
+}
+
+impl InstantRange {
+    pub fn new(
+        timezone: String,
+        start_timestamp: Option<String>,
+        end_timestamp: Option<String>,
+        start_inclusive: bool,
+        end_inclusive: bool,
+    ) -> Self {
+        Self {
+            timezone,
+            start_timestamp,
+            end_timestamp,
+            start_inclusive,
+            end_inclusive,
+        }
+    }
+
+    /// Create a new [InstantRange] with an end timestamp inclusive.
+    pub fn up_to(end_timestamp: &str, timezone: &str) -> Self {
+        Self::new(
+            timezone.to_string(),
+            None,
+            Some(end_timestamp.to_string()),
+            false,
+            true,
+        )
+    }
+
+    pub fn timezone(&self) -> &str {
+        &self.timezone
+    }
+
+    pub fn start_timestamp(&self) -> Result<Option<DateTime<Utc>>> {
+        self.start_timestamp
+            .as_deref()
+            .map(|timestamp| Instant::parse_datetime(timestamp, 
&self.timezone))
+            .transpose()
+    }
+
+    pub fn end_timestamp(&self) -> Result<Option<DateTime<Utc>>> {
+        self.end_timestamp
+            .as_deref()
+            .map(|timestamp| Instant::parse_datetime(timestamp, 
&self.timezone))
+            .transpose()
+    }
+
+    pub fn is_in_range(&self, timestamp: &str, timezone: &str) -> Result<bool> 
{
+        let t = Instant::parse_datetime(timestamp, timezone)?;
+        if let Some(start) = self.start_timestamp()? {
+            if self.start_inclusive {
+                if t < start {
+                    return Ok(false);
+                }
+            } else if t <= start {
+                return Ok(false);
+            }
+        }
+
+        if let Some(end) = self.end_timestamp()? {
+            if self.end_inclusive {
+                if t > end {
+                    return Ok(false);
+                }
+            } else if t >= end {
+                return Ok(false);
+            }
+        }
+
+        Ok(true)
+    }
+
+    pub fn not_in_range(&self, timestamp: &str, timezone: &str) -> 
Result<bool> {
+        Ok(!self.is_in_range(timestamp, timezone)?)
+    }
+}
+
 #[allow(dead_code)]
 #[derive(Debug, Clone, PartialEq)]
 pub struct TimelineSelector {
@@ -38,7 +123,7 @@ pub struct TimelineSelector {
 
 #[allow(dead_code)]
 impl TimelineSelector {
-    fn get_timezone_from_configs(hudi_configs: Arc<HudiConfigs>) -> String {
+    fn get_timezone_from_configs(hudi_configs: &HudiConfigs) -> String {
         hudi_configs
             .get_or_default(HudiTableConfig::TimelineTimezone)
             .to::<String>()
@@ -53,7 +138,7 @@ impl TimelineSelector {
         start: Option<&str>,
         end: Option<&str>,
     ) -> Result<Self> {
-        let timezone = Self::get_timezone_from_configs(hudi_configs);
+        let timezone = Self::get_timezone_from_configs(&hudi_configs);
         let start_datetime = start
             .map(|s| Instant::parse_datetime(s, &timezone))
             .transpose()?;
@@ -72,7 +157,7 @@ impl TimelineSelector {
 
     pub fn completed_replacecommits(hudi_configs: Arc<HudiConfigs>) -> Self {
         Self {
-            timezone: Self::get_timezone_from_configs(hudi_configs),
+            timezone: Self::get_timezone_from_configs(&hudi_configs),
             start_datetime: None,
             end_datetime: None,
             states: vec![State::Completed],
@@ -191,6 +276,130 @@ mod tests {
     use std::str::FromStr;
     use std::sync::Arc;
 
+    #[test]
+    fn test_new_instant_range() {
+        let range = InstantRange::new(
+            "UTC".to_string(),
+            Some("20240101000000000".to_string()),
+            Some("20241231235959999".to_string()),
+            true,
+            false,
+        );
+
+        assert_eq!(range.timezone(), "UTC");
+        assert_eq!(range.start_timestamp.as_deref(), 
Some("20240101000000000"));
+        assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+        assert!(range.start_inclusive);
+        assert!(!range.end_inclusive);
+    }
+
+    #[test]
+    fn test_up_to() {
+        let range = InstantRange::up_to("20241231235959999", "UTC");
+
+        assert_eq!(range.timezone(), "UTC");
+        assert!(range.start_timestamp.is_none());
+        assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+        assert!(!range.start_inclusive);
+        assert!(range.end_inclusive);
+    }
+
+    #[test]
+    fn test_is_in_range_inclusive_bounds() {
+        let range = InstantRange::new(
+            "UTC".to_string(),
+            Some("20240101000000000".to_string()),
+            Some("20241231235959999".to_string()),
+            true,
+            true,
+        );
+
+        // Test exact bounds
+        assert!(range.is_in_range("20240101000000000", "UTC").unwrap());
+        assert!(range.is_in_range("20241231235959999", "UTC").unwrap());
+
+        // Test inside range
+        assert!(range.is_in_range("20240615120000000", "UTC").unwrap());
+
+        // Test outside range
+        assert!(!range.is_in_range("20231231235959999", "UTC").unwrap());
+        assert!(!range.is_in_range("20250101000000000", "UTC").unwrap());
+    }
+
+    #[test]
+    fn test_is_in_range_exclusive_bounds() {
+        let range = InstantRange::new(
+            "UTC".to_string(),
+            Some("20240101000000000".to_string()),
+            Some("20241231235959999".to_string()),
+            false,
+            false,
+        );
+
+        // Test exact bounds
+        assert!(!range.is_in_range("20240101000000000", "UTC").unwrap());
+        assert!(!range.is_in_range("20241231235959999", "UTC").unwrap());
+
+        // Test inside range
+        assert!(range.is_in_range("20240615120000000", "UTC").unwrap());
+    }
+
+    #[test]
+    fn test_not_in_range() {
+        let range = InstantRange::new(
+            "UTC".to_string(),
+            Some("20240101000000000".to_string()),
+            Some("20241231235959999".to_string()),
+            true,
+            true,
+        );
+
+        assert!(!range.not_in_range("20240615120000000", "UTC").unwrap());
+        assert!(range.not_in_range("20231231235959999", "UTC").unwrap());
+    }
+
+    #[test]
+    fn test_invalid_timestamp_format() {
+        let range = InstantRange::new(
+            "UTC".to_string(),
+            Some("20240101000000000".to_string()),
+            Some("20241231235959999".to_string()),
+            true,
+            true,
+        );
+
+        assert!(range.is_in_range("invalid_timestamp", "UTC").is_err());
+    }
+
+    #[test]
+    fn test_invalid_timezone() {
+        let range = InstantRange::new(
+            "Invalid/Timezone".to_string(),
+            Some("20240101000000000".to_string()),
+            Some("20241231235959999".to_string()),
+            true,
+            true,
+        );
+
+        assert!(range.is_in_range("20240615120000000", "UTC").is_err());
+    }
+
+    #[test]
+    fn test_millisecond_precision() {
+        let range = InstantRange::new(
+            "UTC".to_string(),
+            Some("20240101000000000".to_string()),
+            Some("20240101000000999".to_string()),
+            true,
+            true,
+        );
+
+        assert!(range.is_in_range("20240101000000000", "UTC").unwrap());
+        assert!(range.is_in_range("20240101000000500", "UTC").unwrap());
+        assert!(range.is_in_range("20240101000000999", "UTC").unwrap());
+        assert!(!range.is_in_range("20240101000001000", "UTC").unwrap());
+    }
+
     fn create_test_selector(
         actions: &[Action],
         states: &[State],
diff --git a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql 
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
new file mode 100644
index 0000000..1c7c3bc
--- /dev/null
+++ b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_simplekeygen_nonhivestyle (
+                                              id INT,
+                                              name STRING,
+                                              isActive BOOLEAN,
+                                              shortField SHORT,
+                                              intField INT,
+                                              longField LONG,
+                                              floatField FLOAT,
+                                              doubleField DOUBLE,
+                                              decimalField DECIMAL(10,5),
+                                              dateField DATE,
+                                              timestampField TIMESTAMP,
+                                              binaryField BINARY,
+                                              arrayField 
ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>,  -- Array of structs
+                                              mapField MAP<STRING, 
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>, 
 -- Map with struct values
+                                              structField STRUCT<
+                                                  field1: STRING,
+                                              field2: INT,
+                                              child_struct: STRUCT<
+                                                  child_field1: DOUBLE,
+                                              child_field2: BOOLEAN
+                                                  >
+                                                  >,
+                                              byteField BYTE
+)
+    USING HUDI
+    LOCATION '/opt/data/external_tables/v6_simplekeygen_nonhivestyle'
+TBLPROPERTIES (
+    type = 'mor',
+    primaryKey = 'id',
+    preCombineField = 'longField',
+    'hoodie.metadata.enable' = 'false',
+    'hoodie.datasource.write.hive_style_partitioning' = 'false',
+    'hoodie.datasource.write.drop.partition.columns' = 'false',
+    'hoodie.table.log.file.format' = 'PARQUET',
+    'hoodie.logfile.data.block.format' = 'parquet',
+    'hoodie.datasource.write.record.merger.impls' = 
'org.apache.hudi.HoodieSparkRecordMerger',
+    'hoodie.parquet.small.file.limit' = '0'
+)
+PARTITIONED BY (byteField);
+
+INSERT INTO v6_simplekeygen_nonhivestyle VALUES
+                                             (1, 'Alice', true, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                              ARRAY(STRUCT('red', 100), 
STRUCT('blue', 200), STRUCT('green', 300)),
+                                              MAP('key1', STRUCT(123.456, 
true), 'key2', STRUCT(789.012, false)),
+                                              STRUCT('Alice', 30, 
STRUCT(123.456, true)),
+                                              10
+                                             ),
+                                             (2, 'Bob', false, 100, 25000, 
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), 
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+                                              ARRAY(STRUCT('yellow', 400), 
STRUCT('purple', 500)),
+                                              MAP('key3', STRUCT(234.567, 
true), 'key4', STRUCT(567.890, false)),
+                                              STRUCT('Bob', 40, 
STRUCT(789.012, false)),
+                                              20
+                                             ),
+                                             (3, 'Carol', true, 200, 35000, 
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE), 
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS 
BINARY),
+                                              ARRAY(STRUCT('black', 600), 
STRUCT('white', 700), STRUCT('pink', 800)),
+                                              MAP('key5', STRUCT(345.678, 
true), 'key6', STRUCT(654.321, false)),
+                                              STRUCT('Carol', 25, 
STRUCT(456.789, true)),
+                                              10
+                                             );
+
+INSERT INTO v6_simplekeygen_nonhivestyle VALUES
+                                             (1, 'Alice', false, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                              ARRAY(STRUCT('red', 100), 
STRUCT('blue', 200), STRUCT('green', 300)),
+                                              MAP('key1', STRUCT(123.456, 
true), 'key2', STRUCT(789.012, false)),
+                                              STRUCT('Alice', 30, 
STRUCT(123.456, true)),
+                                              10
+                                             ),
+                                             (4, 'Diana', true, 500, 45000, 
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), 
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+                                              ARRAY(STRUCT('orange', 900), 
STRUCT('gray', 1000)),
+                                              MAP('key7', STRUCT(456.789, 
true), 'key8', STRUCT(123.456, false)),
+                                              STRUCT('Diana', 50, 
STRUCT(987.654, true)),
+                                              30
+                                             );
diff --git a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.zip 
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.zip
new file mode 100644
index 0000000..aef8e72
Binary files /dev/null and 
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.zip differ
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index a7b1e7b..c5562d2 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -17,10 +17,10 @@
  * under the License.
  */
 
+use arrow_array::{BooleanArray, Int32Array, RecordBatch, StringArray};
 use std::fs;
 use std::io::Cursor;
 use std::path::{Path, PathBuf};
-
 use strum_macros::{AsRefStr, EnumIter, EnumString};
 use tempfile::tempdir;
 use url::Url;
@@ -34,6 +34,7 @@ pub fn extract_test_table(zip_path: &Path) -> PathBuf {
     target_dir
 }
 
+#[allow(dead_code)]
 #[derive(Debug, EnumString, AsRefStr, EnumIter)]
 #[strum(serialize_all = "snake_case")]
 pub enum SampleTable {
@@ -47,6 +48,37 @@ pub enum SampleTable {
 }
 
 impl SampleTable {
+    /// Return rows of columns (id, name, isActive) for the given 
[RecordBatch] order by id.
+    pub fn sample_data_order_by_id(record_batch: &RecordBatch) -> Vec<(i32, 
&str, bool)> {
+        let ids = record_batch
+            .column_by_name("id")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let names = record_batch
+            .column_by_name("name")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let is_actives = record_batch
+            .column_by_name("isActive")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .unwrap();
+
+        let mut data: Vec<(i32, &str, bool)> = ids
+            .iter()
+            .zip(names.iter())
+            .zip(is_actives.iter())
+            .map(|((id, name), is_active)| (id.unwrap(), name.unwrap(), 
is_active.unwrap()))
+            .collect();
+        data.sort_unstable_by_key(|(id, _, _)| *id);
+        data
+    }
+
     fn zip_path(&self, table_type: &str) -> Box<Path> {
         let dir = env!("CARGO_MANIFEST_DIR");
         let data_path = Path::new(dir)
@@ -68,6 +100,10 @@ impl SampleTable {
         path_buf.to_str().unwrap().to_string()
     }
 
+    pub fn paths(&self) -> Vec<String> {
+        vec![self.path_to_cow(), self.path_to_mor()]
+    }
+
     pub fn url_to_cow(&self) -> Url {
         let path = self.path_to_cow();
         Url::from_file_path(path).unwrap()
@@ -77,6 +113,10 @@ impl SampleTable {
         let path = self.path_to_mor();
         Url::from_file_path(path).unwrap()
     }
+
+    pub fn urls(&self) -> Vec<Url> {
+        vec![self.url_to_cow(), self.url_to_mor()]
+    }
 }
 
 #[cfg(test)]


Reply via email to