This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 493697b feat: support time travel query for MOR tables (#256)
493697b is described below
commit 493697b3c1f22a96d846b924883b1e3b5e5683f1
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 21 20:07:36 2025 -0600
feat: support time travel query for MOR tables (#256)
- Add `InstantRange` to support filter reading log blocks
- Adjust file group reader APIs to accept `InstantRange` for filtering
---
crates/core/src/file_group/log_file/log_block.rs | 1 +
crates/core/src/file_group/log_file/reader.rs | 60 +++++-
crates/core/src/file_group/reader.rs | 18 +-
crates/core/src/table/mod.rs | 158 ++++++---------
crates/core/src/timeline/mod.rs | 6 +-
crates/core/src/timeline/selector.rs | 215 ++++++++++++++++++++-
.../tables/mor/v6_simplekeygen_nonhivestyle.sql | 93 +++++++++
.../tables/mor/v6_simplekeygen_nonhivestyle.zip | Bin 0 -> 37571 bytes
crates/tests/src/lib.rs | 42 +++-
9 files changed, 474 insertions(+), 119 deletions(-)
diff --git a/crates/core/src/file_group/log_file/log_block.rs
b/crates/core/src/file_group/log_file/log_block.rs
index ab59a40..e1b685d 100644
--- a/crates/core/src/file_group/log_file/log_block.rs
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -137,6 +137,7 @@ pub struct LogBlock {
pub header: HashMap<BlockMetadataKey, String>,
pub record_batches: Vec<RecordBatch>,
pub footer: HashMap<BlockMetadataKey, String>,
+ pub skipped: bool,
}
impl LogBlock {
diff --git a/crates/core/src/file_group/log_file/reader.rs
b/crates/core/src/file_group/log_file/reader.rs
index 8a0d439..07a990d 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -17,6 +17,8 @@
* under the License.
*/
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::file_group::log_file::log_block::{
BlockMetadataKey, BlockMetadataType, BlockType, LogBlock,
@@ -24,6 +26,7 @@ use crate::file_group::log_file::log_block::{
use crate::file_group::log_file::log_format::{LogFormatVersion, MAGIC};
use crate::storage::reader::StorageReader;
use crate::storage::Storage;
+use crate::timeline::selector::InstantRange;
use crate::Result;
use arrow_array::RecordBatch;
use bytes::BytesMut;
@@ -36,31 +39,48 @@ pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024 * 1024;
#[allow(dead_code)]
#[derive(Debug)]
pub struct LogFileReader<R> {
+ hudi_configs: Arc<HudiConfigs>,
storage: Arc<Storage>,
reader: R,
buffer: BytesMut,
+ timezone: String,
}
impl LogFileReader<StorageReader> {
- pub async fn new(storage: Arc<Storage>, relative_path: &str) ->
Result<Self> {
+ pub async fn new(
+ hudi_configs: Arc<HudiConfigs>,
+ storage: Arc<Storage>,
+ relative_path: &str,
+ ) -> Result<Self> {
let reader = storage.get_storage_reader(relative_path).await?;
+ let timezone = hudi_configs
+ .get_or_default(HudiTableConfig::TimelineTimezone)
+ .to::<String>();
Ok(Self {
+ hudi_configs,
storage,
reader,
buffer: BytesMut::with_capacity(DEFAULT_BUFFER_SIZE),
+ timezone,
})
}
- fn read_all_blocks(&mut self) -> Result<Vec<LogBlock>> {
+ fn read_all_blocks(&mut self, instant_range: &InstantRange) ->
Result<Vec<LogBlock>> {
let mut blocks = Vec::new();
- while let Some(block) = self.read_next_block()? {
+ while let Some(block) = self.read_next_block(instant_range)? {
+ if block.skipped {
+ continue;
+ }
blocks.push(block);
}
Ok(blocks)
}
- pub fn read_all_records_unmerged(&mut self) -> Result<Vec<RecordBatch>> {
- let all_blocks = self.read_all_blocks()?;
+ pub fn read_all_records_unmerged(
+ &mut self,
+ instant_range: &InstantRange,
+ ) -> Result<Vec<RecordBatch>> {
+ let all_blocks = self.read_all_blocks(instant_range)?;
let mut batches = Vec::new();
for block in all_blocks {
batches.extend_from_slice(&block.record_batches);
@@ -217,7 +237,21 @@ impl<R: Read + Seek> LogFileReader<R> {
Ok(Some(u64::from_be_bytes(size_buf)))
}
- fn read_next_block(&mut self) -> Result<Option<LogBlock>> {
+ fn should_skip_block(
+ &self,
+ header: &HashMap<BlockMetadataKey, String>,
+ instant_range: &InstantRange,
+ ) -> Result<bool> {
+ let instant_time =
+ header
+ .get(&BlockMetadataKey::InstantTime)
+ .ok_or(CoreError::LogFormatError(
+ "Instant time not found".to_string(),
+ ))?;
+ instant_range.not_in_range(instant_time, &self.timezone)
+ }
+
+ fn read_next_block(&mut self, instant_range: &InstantRange) ->
Result<Option<LogBlock>> {
if !self.read_magic()? {
return Ok(None);
}
@@ -231,6 +265,11 @@ impl<R: Read + Seek> LogFileReader<R> {
let format_version = self.read_format_version()?;
let block_type = self.read_block_type(&format_version)?;
let header = self.read_block_metadata(BlockMetadataType::Header,
&format_version)?;
+ let mut skipped = false;
+ if self.should_skip_block(&header, instant_range)? {
+ skipped = true;
+ // TODO skip reading block
+ }
let content = self.read_content(&format_version, block_length)?;
let record_batches = LogBlock::decode_content(&block_type, content)?;
let footer = self.read_block_metadata(BlockMetadataType::Footer,
&format_version)?;
@@ -242,6 +281,7 @@ impl<R: Read + Seek> LogFileReader<R> {
header,
record_batches,
footer,
+ skipped,
}))
}
}
@@ -265,9 +305,13 @@ mod tests {
async fn test_read_sample_log_file() {
let (dir, file_name) = get_sample_log_file();
let dir_url = Url::from_directory_path(dir).unwrap();
+ let hudi_configs = Arc::new(HudiConfigs::empty());
let storage = Storage::new_with_base_url(dir_url).unwrap();
- let mut reader = LogFileReader::new(storage,
&file_name).await.unwrap();
- let blocks = reader.read_all_blocks().unwrap();
+ let mut reader = LogFileReader::new(hudi_configs, storage, &file_name)
+ .await
+ .unwrap();
+ let instant_range = InstantRange::up_to("20250113230424191", "utc");
+ let blocks = reader.read_all_blocks(&instant_range).unwrap();
assert_eq!(blocks.len(), 1);
let block = &blocks[0];
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 387b591..ce2ae1c 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -32,18 +32,19 @@ use std::sync::Arc;
use crate::file_group::log_file::reader::LogFileReader;
use crate::merge::record_merger::RecordMerger;
+use crate::timeline::selector::InstantRange;
use arrow::compute::filter_record_batch;
/// File group reader handles all read operations against a file group.
#[derive(Clone, Debug)]
pub struct FileGroupReader {
- storage: Arc<Storage>,
hudi_configs: Arc<HudiConfigs>,
+ storage: Arc<Storage>,
and_filters: Vec<SchemableFilter>,
}
impl FileGroupReader {
- pub fn new(storage: Arc<Storage>, hudi_configs: Arc<HudiConfigs>) -> Self {
+ pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
Self {
storage,
hudi_configs,
@@ -84,7 +85,7 @@ impl FileGroupReader {
let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
let storage = Storage::new(Arc::new(others), hudi_configs.clone())?;
- Ok(Self::new(storage, hudi_configs))
+ Ok(Self::new(hudi_configs, storage))
}
fn create_boolean_array_mask(&self, records: &RecordBatch) ->
Result<BooleanArray> {
@@ -124,6 +125,7 @@ impl FileGroupReader {
&self,
file_slice: &FileSlice,
base_file_only: bool,
+ instant_range: InstantRange,
) -> Result<RecordBatch> {
let relative_path = file_slice.base_file_relative_path()?;
if base_file_only {
@@ -138,9 +140,11 @@ impl FileGroupReader {
for log_file in &file_slice.log_files {
let relative_path =
file_slice.log_file_relative_path(log_file)?;
+ let hudi_configs = self.hudi_configs.clone();
let storage = self.storage.clone();
- let mut log_file_reader = LogFileReader::new(storage,
&relative_path).await?;
- let log_file_records =
log_file_reader.read_all_records_unmerged()?;
+ let mut log_file_reader =
+ LogFileReader::new(hudi_configs, storage,
&relative_path).await?;
+ let log_file_records =
log_file_reader.read_all_records_unmerged(&instant_range)?;
all_records.extend_from_slice(&log_file_records);
}
@@ -165,7 +169,7 @@ mod tests {
fn test_new() {
let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
let storage = Storage::new_with_base_url(base_url).unwrap();
- let fg_reader = FileGroupReader::new(storage.clone(),
Arc::from(HudiConfigs::empty()));
+ let fg_reader = FileGroupReader::new(Arc::from(HudiConfigs::empty()),
storage.clone());
assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
}
@@ -237,7 +241,7 @@ mod tests {
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
.unwrap();
let empty_configs = Arc::new(HudiConfigs::empty());
- let reader = FileGroupReader::new(storage, empty_configs.clone());
+ let reader = FileGroupReader::new(empty_configs.clone(), storage);
let result = reader
.read_file_slice_by_base_file_path("non_existent_file")
.await;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4c92faf..31e25e0 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -105,6 +105,7 @@ use crate::timeline::Timeline;
use crate::Result;
use crate::metadata::meta_field::MetaField;
+use crate::timeline::selector::InstantRange;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use std::collections::{HashMap, HashSet};
@@ -147,6 +148,13 @@ impl Table {
.map_err(CoreError::from)
}
+ #[inline]
+ pub fn timezone(&self) -> String {
+ self.hudi_configs
+ .get_or_default(HudiTableConfig::TimelineTimezone)
+ .to::<String>()
+ }
+
pub fn hudi_options(&self) -> HashMap<String, String> {
self.hudi_configs.as_options()
}
@@ -259,8 +267,8 @@ impl Table {
pub fn create_file_group_reader(&self) -> FileGroupReader {
FileGroupReader::new(
- self.file_system_view.storage.clone(),
self.hudi_configs.clone(),
+ self.file_system_view.storage.clone(),
)
}
@@ -300,10 +308,12 @@ impl Table {
let file_slices = self.get_file_slices_as_of(timestamp,
filters).await?;
let fg_reader = self.create_file_group_reader();
let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
+ let timezone = self.timezone();
+ let instant_range = InstantRange::up_to(timestamp, &timezone);
let batches = futures::future::try_join_all(
file_slices
.iter()
- .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+ .map(|f| fg_reader.read_file_slice(f, base_file_only,
instant_range.clone())),
)
.await?;
Ok(batches)
@@ -342,10 +352,12 @@ impl Table {
let fg_reader =
self.create_file_group_reader_with_filters(filters,
MetaField::schema().as_ref())?;
let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
+ let timezone = self.timezone();
+ let instant_range = InstantRange::up_to(as_of_timestamp, &timezone);
let batches = futures::future::try_join_all(
file_slices
.iter()
- .map(|f| fg_reader.read_file_slice(f, base_file_only)),
+ .map(|f| fg_reader.read_file_slice(f, base_file_only,
instant_range.clone())),
)
.await?;
Ok(batches)
@@ -896,19 +908,14 @@ mod tests {
assert_eq!(actual, expected);
}
- mod test_snapshot_queries {
+ mod test_snapshot_and_time_travel_queries {
use super::super::*;
- use crate::metadata::meta_field::MetaField;
- use arrow_array::{Array, BooleanArray, Int32Array, StringArray};
+ use arrow::compute::concat_batches;
use hudi_tests::SampleTable;
#[tokio::test]
async fn test_empty() -> Result<()> {
- let base_urls = [
- SampleTable::V6Empty.url_to_cow(),
- SampleTable::V6Empty.url_to_mor(),
- ];
- for base_url in base_urls.iter() {
+ for base_url in SampleTable::V6Empty.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let records = hudi_table.read_snapshot(&[]).await?;
assert!(records.is_empty());
@@ -918,110 +925,67 @@ mod tests {
#[tokio::test]
async fn test_non_partitioned() -> Result<()> {
- let base_urls = [
- SampleTable::V6Nonpartitioned.url_to_cow(),
- SampleTable::V6Nonpartitioned.url_to_mor(),
- ];
- for base_url in base_urls.iter() {
+ for base_url in SampleTable::V6Nonpartitioned.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let records = hudi_table.read_snapshot(&[]).await?;
- let all_records =
arrow::compute::concat_batches(&records[0].schema(), &records)?;
-
- let ids = all_records
- .column_by_name("id")
- .unwrap()
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap();
- let names = all_records
- .column_by_name("name")
- .unwrap()
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- let is_actives = all_records
- .column_by_name("isActive")
- .unwrap()
- .as_any()
- .downcast_ref::<BooleanArray>()
- .unwrap();
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
- let mut data: Vec<(i32, &str, bool)> = ids
- .iter()
- .zip(names.iter())
- .zip(is_actives.iter())
- .map(|((id, name), is_active)| (id.unwrap(),
name.unwrap(), is_active.unwrap()))
- .collect();
- data.sort_unstable_by_key(|(id, _, _)| *id);
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
assert_eq!(
- data,
+ sample_data,
vec![
(1, "Alice", false),
(2, "Bob", false),
(3, "Carol", true),
(4, "Diana", true),
]
- )
+ );
}
Ok(())
}
#[tokio::test]
async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
- let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_mor();
- let hudi_table = Table::new(base_url.path()).await?;
-
- let filters = &[
- Filter::try_from(("byteField", ">=", "10"))?,
- Filter::try_from(("byteField", "<", "20"))?,
- Filter::try_from(("shortField", "!=", "100"))?,
- ];
- let records = hudi_table.read_snapshot(filters).await?;
- let all_records =
arrow::compute::concat_batches(&records[0].schema(), &records)?;
-
- let partition_paths = all_records
- .column_by_name(MetaField::PartitionPath.as_ref())
- .unwrap()
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- let ids = all_records
- .column_by_name("id")
- .unwrap()
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap();
- let names = all_records
- .column_by_name("name")
- .unwrap()
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- let is_actives = all_records
- .column_by_name("isActive")
- .unwrap()
- .as_any()
- .downcast_ref::<BooleanArray>()
- .unwrap();
+ for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
- let mut data: Vec<(&str, i32, &str, bool)> = partition_paths
- .iter()
- .zip(ids.iter())
- .zip(names.iter())
- .zip(is_actives.iter())
- .map(|(((pt, id), name), is_active)| {
- (pt.unwrap(), id.unwrap(), name.unwrap(),
is_active.unwrap())
- })
- .collect();
- data.sort_unstable_by_key(|(_, id, _, _)| *id);
- assert_eq!(
- data,
- vec![
- ("byteField=10/shortField=300", 1, "Alice", false),
- ("byteField=10/shortField=300", 3, "Carol", true),
- ]
- );
+ let filters = &[
+ Filter::try_from(("byteField", ">=", "10"))?,
+ Filter::try_from(("byteField", "<", "20"))?,
+ Filter::try_from(("shortField", "!=", "100"))?,
+ ];
+ let records = hudi_table.read_snapshot(filters).await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(sample_data, vec![(1, "Alice", false), (3, "Carol",
true),]);
+ }
+ Ok(())
+ }
+ #[tokio::test]
+ async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
+ for base_url in
&[SampleTable::V6SimplekeygenNonhivestyle.url_to_mor()] {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let commit_timestamps = hudi_table
+ .timeline
+ .completed_commits
+ .iter()
+ .map(|i| i.timestamp.as_str())
+ .collect::<Vec<_>>();
+ let first_commit = commit_timestamps[0];
+ let records = hudi_table.read_snapshot_as_of(first_commit,
&[]).await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol",
true),]
+ );
+ }
Ok(())
}
}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index e1fc486..262d05a 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -17,7 +17,7 @@
* under the License.
*/
mod instant;
-mod selector;
+pub(crate) mod selector;
use crate::config::HudiConfigs;
use crate::error::CoreError;
@@ -54,8 +54,8 @@ impl Timeline {
) -> Result<Self> {
let storage = Storage::new(storage_options.clone(),
hudi_configs.clone())?;
Ok(Self {
- storage,
hudi_configs,
+ storage,
completed_commits,
})
}
@@ -68,8 +68,8 @@ impl Timeline {
let selector =
TimelineSelector::completed_commits(hudi_configs.clone())?;
let completed_commits = Self::load_instants(&selector,
&storage).await?;
Ok(Self {
- storage,
hudi_configs,
+ storage,
completed_commits,
})
}
diff --git a/crates/core/src/timeline/selector.rs
b/crates/core/src/timeline/selector.rs
index e49e374..ff117a7 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -25,6 +25,91 @@ use crate::Result;
use chrono::{DateTime, Utc};
use std::sync::Arc;
+#[derive(Debug, Clone)]
+pub struct InstantRange {
+ timezone: String,
+ start_timestamp: Option<String>,
+ end_timestamp: Option<String>,
+ start_inclusive: bool,
+ end_inclusive: bool,
+}
+
+impl InstantRange {
+ pub fn new(
+ timezone: String,
+ start_timestamp: Option<String>,
+ end_timestamp: Option<String>,
+ start_inclusive: bool,
+ end_inclusive: bool,
+ ) -> Self {
+ Self {
+ timezone,
+ start_timestamp,
+ end_timestamp,
+ start_inclusive,
+ end_inclusive,
+ }
+ }
+
+ /// Create a new [InstantRange] with an end timestamp inclusive.
+ pub fn up_to(end_timestamp: &str, timezone: &str) -> Self {
+ Self::new(
+ timezone.to_string(),
+ None,
+ Some(end_timestamp.to_string()),
+ false,
+ true,
+ )
+ }
+
+ pub fn timezone(&self) -> &str {
+ &self.timezone
+ }
+
+ pub fn start_timestamp(&self) -> Result<Option<DateTime<Utc>>> {
+ self.start_timestamp
+ .as_deref()
+ .map(|timestamp| Instant::parse_datetime(timestamp,
&self.timezone))
+ .transpose()
+ }
+
+ pub fn end_timestamp(&self) -> Result<Option<DateTime<Utc>>> {
+ self.end_timestamp
+ .as_deref()
+ .map(|timestamp| Instant::parse_datetime(timestamp,
&self.timezone))
+ .transpose()
+ }
+
+ pub fn is_in_range(&self, timestamp: &str, timezone: &str) -> Result<bool>
{
+ let t = Instant::parse_datetime(timestamp, timezone)?;
+ if let Some(start) = self.start_timestamp()? {
+ if self.start_inclusive {
+ if t < start {
+ return Ok(false);
+ }
+ } else if t <= start {
+ return Ok(false);
+ }
+ }
+
+ if let Some(end) = self.end_timestamp()? {
+ if self.end_inclusive {
+ if t > end {
+ return Ok(false);
+ }
+ } else if t >= end {
+ return Ok(false);
+ }
+ }
+
+ Ok(true)
+ }
+
+ pub fn not_in_range(&self, timestamp: &str, timezone: &str) ->
Result<bool> {
+ Ok(!self.is_in_range(timestamp, timezone)?)
+ }
+}
+
#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq)]
pub struct TimelineSelector {
@@ -38,7 +123,7 @@ pub struct TimelineSelector {
#[allow(dead_code)]
impl TimelineSelector {
- fn get_timezone_from_configs(hudi_configs: Arc<HudiConfigs>) -> String {
+ fn get_timezone_from_configs(hudi_configs: &HudiConfigs) -> String {
hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
.to::<String>()
@@ -53,7 +138,7 @@ impl TimelineSelector {
start: Option<&str>,
end: Option<&str>,
) -> Result<Self> {
- let timezone = Self::get_timezone_from_configs(hudi_configs);
+ let timezone = Self::get_timezone_from_configs(&hudi_configs);
let start_datetime = start
.map(|s| Instant::parse_datetime(s, &timezone))
.transpose()?;
@@ -72,7 +157,7 @@ impl TimelineSelector {
pub fn completed_replacecommits(hudi_configs: Arc<HudiConfigs>) -> Self {
Self {
- timezone: Self::get_timezone_from_configs(hudi_configs),
+ timezone: Self::get_timezone_from_configs(&hudi_configs),
start_datetime: None,
end_datetime: None,
states: vec![State::Completed],
@@ -191,6 +276,130 @@ mod tests {
use std::str::FromStr;
use std::sync::Arc;
+ #[test]
+ fn test_new_instant_range() {
+ let range = InstantRange::new(
+ "UTC".to_string(),
+ Some("20240101000000000".to_string()),
+ Some("20241231235959999".to_string()),
+ true,
+ false,
+ );
+
+ assert_eq!(range.timezone(), "UTC");
+ assert_eq!(range.start_timestamp.as_deref(),
Some("20240101000000000"));
+ assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+ assert!(range.start_inclusive);
+ assert!(!range.end_inclusive);
+ }
+
+ #[test]
+ fn test_up_to() {
+ let range = InstantRange::up_to("20241231235959999", "UTC");
+
+ assert_eq!(range.timezone(), "UTC");
+ assert!(range.start_timestamp.is_none());
+ assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+ assert!(!range.start_inclusive);
+ assert!(range.end_inclusive);
+ }
+
+ #[test]
+ fn test_is_in_range_inclusive_bounds() {
+ let range = InstantRange::new(
+ "UTC".to_string(),
+ Some("20240101000000000".to_string()),
+ Some("20241231235959999".to_string()),
+ true,
+ true,
+ );
+
+ // Test exact bounds
+ assert!(range.is_in_range("20240101000000000", "UTC").unwrap());
+ assert!(range.is_in_range("20241231235959999", "UTC").unwrap());
+
+ // Test inside range
+ assert!(range.is_in_range("20240615120000000", "UTC").unwrap());
+
+ // Test outside range
+ assert!(!range.is_in_range("20231231235959999", "UTC").unwrap());
+ assert!(!range.is_in_range("20250101000000000", "UTC").unwrap());
+ }
+
+ #[test]
+ fn test_is_in_range_exclusive_bounds() {
+ let range = InstantRange::new(
+ "UTC".to_string(),
+ Some("20240101000000000".to_string()),
+ Some("20241231235959999".to_string()),
+ false,
+ false,
+ );
+
+ // Test exact bounds
+ assert!(!range.is_in_range("20240101000000000", "UTC").unwrap());
+ assert!(!range.is_in_range("20241231235959999", "UTC").unwrap());
+
+ // Test inside range
+ assert!(range.is_in_range("20240615120000000", "UTC").unwrap());
+ }
+
+ #[test]
+ fn test_not_in_range() {
+ let range = InstantRange::new(
+ "UTC".to_string(),
+ Some("20240101000000000".to_string()),
+ Some("20241231235959999".to_string()),
+ true,
+ true,
+ );
+
+ assert!(!range.not_in_range("20240615120000000", "UTC").unwrap());
+ assert!(range.not_in_range("20231231235959999", "UTC").unwrap());
+ }
+
+ #[test]
+ fn test_invalid_timestamp_format() {
+ let range = InstantRange::new(
+ "UTC".to_string(),
+ Some("20240101000000000".to_string()),
+ Some("20241231235959999".to_string()),
+ true,
+ true,
+ );
+
+ assert!(range.is_in_range("invalid_timestamp", "UTC").is_err());
+ }
+
+ #[test]
+ fn test_invalid_timezone() {
+ let range = InstantRange::new(
+ "Invalid/Timezone".to_string(),
+ Some("20240101000000000".to_string()),
+ Some("20241231235959999".to_string()),
+ true,
+ true,
+ );
+
+ assert!(range.is_in_range("20240615120000000", "UTC").is_err());
+ }
+
+ #[test]
+ fn test_millisecond_precision() {
+ let range = InstantRange::new(
+ "UTC".to_string(),
+ Some("20240101000000000".to_string()),
+ Some("20240101000000999".to_string()),
+ true,
+ true,
+ );
+
+ assert!(range.is_in_range("20240101000000000", "UTC").unwrap());
+ assert!(range.is_in_range("20240101000000500", "UTC").unwrap());
+ assert!(range.is_in_range("20240101000000999", "UTC").unwrap());
+ assert!(!range.is_in_range("20240101000001000", "UTC").unwrap());
+ }
+
fn create_test_selector(
actions: &[Action],
states: &[State],
diff --git a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
new file mode 100644
index 0000000..1c7c3bc
--- /dev/null
+++ b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_simplekeygen_nonhivestyle (
+ id INT,
+ name STRING,
+ isActive BOOLEAN,
+ shortField SHORT,
+ intField INT,
+ longField LONG,
+ floatField FLOAT,
+ doubleField DOUBLE,
+ decimalField DECIMAL(10,5),
+ dateField DATE,
+ timestampField TIMESTAMP,
+ binaryField BINARY,
+ arrayField
ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>, -- Array of structs
+ mapField MAP<STRING,
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>,
-- Map with struct values
+ structField STRUCT<
+ field1: STRING,
+ field2: INT,
+ child_struct: STRUCT<
+ child_field1: DOUBLE,
+ child_field2: BOOLEAN
+ >
+ >,
+ byteField BYTE
+)
+ USING HUDI
+ LOCATION '/opt/data/external_tables/v6_simplekeygen_nonhivestyle'
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ preCombineField = 'longField',
+ 'hoodie.metadata.enable' = 'false',
+ 'hoodie.datasource.write.hive_style_partitioning' = 'false',
+ 'hoodie.datasource.write.drop.partition.columns' = 'false',
+ 'hoodie.table.log.file.format' = 'PARQUET',
+ 'hoodie.logfile.data.block.format' = 'parquet',
+ 'hoodie.datasource.write.record.merger.impls' =
'org.apache.hudi.HoodieSparkRecordMerger',
+ 'hoodie.parquet.small.file.limit' = '0'
+)
+PARTITIONED BY (byteField);
+
+INSERT INTO v6_simplekeygen_nonhivestyle VALUES
+ (1, 'Alice', true, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100),
STRUCT('blue', 200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456,
true), 'key2', STRUCT(789.012, false)),
+ STRUCT('Alice', 30,
STRUCT(123.456, true)),
+ 10
+ ),
+ (2, 'Bob', false, 100, 25000,
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE),
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+ ARRAY(STRUCT('yellow', 400),
STRUCT('purple', 500)),
+ MAP('key3', STRUCT(234.567,
true), 'key4', STRUCT(567.890, false)),
+ STRUCT('Bob', 40,
STRUCT(789.012, false)),
+ 20
+ ),
+ (3, 'Carol', true, 200, 35000,
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE),
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS
BINARY),
+ ARRAY(STRUCT('black', 600),
STRUCT('white', 700), STRUCT('pink', 800)),
+ MAP('key5', STRUCT(345.678,
true), 'key6', STRUCT(654.321, false)),
+ STRUCT('Carol', 25,
STRUCT(456.789, true)),
+ 10
+ );
+
+INSERT INTO v6_simplekeygen_nonhivestyle VALUES
+ (1, 'Alice', false, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100),
STRUCT('blue', 200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456,
true), 'key2', STRUCT(789.012, false)),
+ STRUCT('Alice', 30,
STRUCT(123.456, true)),
+ 10
+ ),
+ (4, 'Diana', true, 500, 45000,
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE),
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+ ARRAY(STRUCT('orange', 900),
STRUCT('gray', 1000)),
+ MAP('key7', STRUCT(456.789,
true), 'key8', STRUCT(123.456, false)),
+ STRUCT('Diana', 50,
STRUCT(987.654, true)),
+ 30
+ );
diff --git a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.zip
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.zip
new file mode 100644
index 0000000..aef8e72
Binary files /dev/null and
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.zip differ
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index a7b1e7b..c5562d2 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -17,10 +17,10 @@
* under the License.
*/
+use arrow_array::{BooleanArray, Int32Array, RecordBatch, StringArray};
use std::fs;
use std::io::Cursor;
use std::path::{Path, PathBuf};
-
use strum_macros::{AsRefStr, EnumIter, EnumString};
use tempfile::tempdir;
use url::Url;
@@ -34,6 +34,7 @@ pub fn extract_test_table(zip_path: &Path) -> PathBuf {
target_dir
}
+#[allow(dead_code)]
#[derive(Debug, EnumString, AsRefStr, EnumIter)]
#[strum(serialize_all = "snake_case")]
pub enum SampleTable {
@@ -47,6 +48,37 @@ pub enum SampleTable {
}
impl SampleTable {
+ /// Return rows of columns (id, name, isActive) for the given
[RecordBatch] order by id.
+ pub fn sample_data_order_by_id(record_batch: &RecordBatch) -> Vec<(i32,
&str, bool)> {
+ let ids = record_batch
+ .column_by_name("id")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let names = record_batch
+ .column_by_name("name")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ let is_actives = record_batch
+ .column_by_name("isActive")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+
+ let mut data: Vec<(i32, &str, bool)> = ids
+ .iter()
+ .zip(names.iter())
+ .zip(is_actives.iter())
+ .map(|((id, name), is_active)| (id.unwrap(), name.unwrap(),
is_active.unwrap()))
+ .collect();
+ data.sort_unstable_by_key(|(id, _, _)| *id);
+ data
+ }
+
fn zip_path(&self, table_type: &str) -> Box<Path> {
let dir = env!("CARGO_MANIFEST_DIR");
let data_path = Path::new(dir)
@@ -68,6 +100,10 @@ impl SampleTable {
path_buf.to_str().unwrap().to_string()
}
+ pub fn paths(&self) -> Vec<String> {
+ vec![self.path_to_cow(), self.path_to_mor()]
+ }
+
pub fn url_to_cow(&self) -> Url {
let path = self.path_to_cow();
Url::from_file_path(path).unwrap()
@@ -77,6 +113,10 @@ impl SampleTable {
let path = self.path_to_mor();
Url::from_file_path(path).unwrap()
}
+
+ pub fn urls(&self) -> Vec<Url> {
+ vec![self.url_to_cow(), self.url_to_mor()]
+ }
}
#[cfg(test)]