This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4fa2440 feat: support time travel with read option (#52)
4fa2440 is described below
commit 4fa2440985fd66ec67aebac21f4df74e8f32821e
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Jul 6 20:36:17 2024 -0500
feat: support time travel with read option (#52)
Support passing option `hoodie.read.as.of.timestamp` and perform time
travel query.
Fixes #39
---
crates/core/src/config/mod.rs | 13 +++++++++++-
crates/core/src/config/read.rs | 26 +++++++++---------------
crates/core/src/table/mod.rs | 45 ++++++++++++++++++++++++++---------------
crates/datafusion/src/lib.rs | 19 +++++++++--------
python/src/lib.rs | 5 -----
python/tests/test_table_read.py | 4 +++-
6 files changed, 62 insertions(+), 50 deletions(-)
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index f8975cf..2b37dc7 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -59,7 +59,7 @@ pub trait ConfigParser: AsRef<str> {
}
}
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub enum HudiConfigValue {
Boolean(bool),
Integer(isize),
@@ -157,4 +157,15 @@ impl HudiConfigs {
) -> HudiConfigValue {
parser.parse_value_or_default(&self.raw_configs)
}
+
+ pub fn try_get(
+ &self,
+ parser: impl ConfigParser<Output = HudiConfigValue>,
+ ) -> Option<HudiConfigValue> {
+ let res = parser.parse_value(&self.raw_configs);
+ match res {
+ Ok(v) => Some(v),
+ Err(_) => parser.default_value(),
+ }
+ }
}
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 2af57e8..195aa70 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -27,12 +27,14 @@ use strum_macros::EnumIter;
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
InputPartitions,
+ AsOfTimestamp,
}
impl AsRef<str> for HudiReadConfig {
fn as_ref(&self) -> &str {
match self {
Self::InputPartitions => "hoodie.read.input.partitions",
+ Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
}
}
}
@@ -43,6 +45,7 @@ impl ConfigParser for HudiReadConfig {
fn default_value(&self) -> Option<HudiConfigValue> {
match self {
HudiReadConfig::InputPartitions =>
Some(HudiConfigValue::UInteger(0usize)),
+ _ => None,
}
}
@@ -53,28 +56,20 @@ impl ConfigParser for HudiReadConfig {
.ok_or(anyhow!("Config '{}' not found", self.as_ref()));
match self {
- HudiReadConfig::InputPartitions => get_result
- .and_then(|v| {
- usize::from_str(v).map_err(|e| {
- anyhow!(
- "Failed to parse '{}' for config '{}': {}",
- v,
- self.as_ref(),
- e
- )
- })
- })
+ Self::InputPartitions => get_result
+ .and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e)))
.map(HudiConfigValue::UInteger),
+ Self::AsOfTimestamp => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
}
}
}
#[cfg(test)]
mod tests {
- use std::collections::HashMap;
-
use crate::config::read::HudiReadConfig::InputPartitions;
use crate::config::ConfigParser;
+ use std::collections::HashMap;
+ use std::num::ParseIntError;
#[test]
fn parse_valid_config_value() {
@@ -87,10 +82,7 @@ mod tests {
fn parse_invalid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"foo".to_string())]);
let value = InputPartitions.parse_value(&options);
- assert!(value.err().unwrap().to_string().starts_with(&format!(
- "Failed to parse 'foo' for config '{}'",
- InputPartitions.as_ref()
- )));
+ assert!(value.err().unwrap().is::<ParseIntError>());
assert_eq!(
InputPartitions
.parse_value_or_default(&options)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index db50525..bbc5673 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -34,6 +34,7 @@ use TableTypeValue::CopyOnWrite;
use crate::config::internal::HudiInternalConfig;
use crate::config::read::HudiReadConfig;
+use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::file_group::FileSlice;
@@ -170,14 +171,17 @@ impl Table {
}
pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
- if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+ self.get_file_slices_as_of(timestamp.to::<String>().as_str())
+ .await
+ } else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
self.get_file_slices_as_of(timestamp).await
} else {
Ok(Vec::new())
}
}
- pub async fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
+ async fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
self.file_system_view
.load_file_slices_stats_as_of(timestamp)
.await
@@ -186,14 +190,17 @@ impl Table {
}
pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
- if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+ self.read_snapshot_as_of(timestamp.to::<String>().as_str())
+ .await
+ } else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
self.read_snapshot_as_of(timestamp).await
} else {
Ok(Vec::new())
}
}
- pub async fn read_snapshot_as_of(&self, timestamp: &str) ->
Result<Vec<RecordBatch>> {
+ async fn read_snapshot_as_of(&self, timestamp: &str) ->
Result<Vec<RecordBatch>> {
let file_slices = self
.get_file_slices_as_of(timestamp)
.await
@@ -233,8 +240,7 @@ mod tests {
use url::Url;
- use hudi_tests::{assert_not, TestTable};
-
+ use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig::{
BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields,
IsHiveStylePartitioning,
IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields,
PopulatesMetaFields,
@@ -243,6 +249,7 @@ mod tests {
};
use crate::storage::utils::join_url_segments;
use crate::table::Table;
+ use hudi_tests::{assert_not, TestTable};
#[tokio::test]
async fn hudi_table_get_schema() {
@@ -331,8 +338,8 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = TestTable::V6Nonpartitioned.url();
- let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
+ let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
@@ -343,10 +350,12 @@ mod tests {
);
// as of the latest timestamp
- let file_slices = hudi_table
- .get_file_slices_as_of("20240418173551906")
- .await
- .unwrap();
+ let opts = HashMap::from_iter(vec![(
+ AsOfTimestamp.as_ref().to_string(),
+ "20240418173551906".to_string(),
+ )]);
+ let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+ let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
.iter()
@@ -356,10 +365,12 @@ mod tests {
);
// as of just smaller than the latest timestamp
- let file_slices = hudi_table
- .get_file_slices_as_of("20240418173551905")
- .await
- .unwrap();
+ let opts = HashMap::from_iter(vec![(
+ AsOfTimestamp.as_ref().to_string(),
+ "20240418173551905".to_string(),
+ )]);
+ let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+ let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
.iter()
@@ -369,7 +380,9 @@ mod tests {
);
// as of non-exist old timestamp
- let file_slices = hudi_table.get_file_slices_as_of("0").await.unwrap();
+ let opts =
HashMap::from_iter(vec![(AsOfTimestamp.as_ref().to_string(), "0".to_string())]);
+ let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+ let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
.iter()
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 1435d94..f7a2c43 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -39,29 +39,28 @@ use datafusion_physical_expr::create_physical_expr;
use DataFusionError::Execution;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::config::ConfigParser;
use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
use hudi_core::HudiTable;
#[derive(Clone, Debug)]
pub struct HudiDataSource {
table: Arc<HudiTable>,
- input_partitions: usize,
}
impl HudiDataSource {
pub async fn new(base_uri: &str, options: HashMap<String, String>) ->
Result<Self> {
- let input_partitions = InputPartitions
- .parse_value_or_default(&options)
- .to::<usize>();
match HudiTable::new(base_uri, options).await {
- Ok(t) => Ok(Self {
- table: Arc::new(t),
- input_partitions,
- }),
+ Ok(t) => Ok(Self { table: Arc::new(t) }),
Err(e) => Err(Execution(format!("Failed to create Hudi table: {}",
e))),
}
}
+
+ fn get_input_partitions(&self) -> usize {
+ self.table
+ .configs
+ .get_or_default(InputPartitions)
+ .to::<usize>()
+ }
}
#[async_trait]
@@ -92,7 +91,7 @@ impl TableProvider for HudiDataSource {
) -> Result<Arc<dyn ExecutionPlan>> {
let file_slices = self
.table
- .split_file_slices(self.input_partitions)
+ .split_file_slices(self.get_input_partitions())
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 8962cff..745b457 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -129,11 +129,6 @@ impl BindingHudiTable {
pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
}
-
- pub fn read_snapshot_as_of(&self, timestamp: &str, py: Python) ->
PyResult<PyObject> {
- rt().block_on(self._table.read_snapshot_as_of(timestamp))?
- .to_pyarrow(py)
- }
}
#[cfg(not(tarpaulin))]
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 6370bd5..61195fb 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -65,7 +65,9 @@ def test_sample_table(get_sample_table):
{'_hoodie_commit_time': '20240402123035233',
'ts': 1695516137016,
'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c',
'fare': 34.15}]
- batches = table.read_snapshot_as_of("20240402123035233")
+ table = HudiTable(table_path, {
+ "hoodie.read.as.of.timestamp": "20240402123035233"})
+ batches = table.read_snapshot()
t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
assert t.to_pylist() == [{'_hoodie_commit_time': '20240402123035233',
'ts': 1695046462179,
'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00',
'fare': 33.9},