This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 2ef9557 refactor: improve APIs for handling options (#161)
2ef9557 is described below
commit 2ef95573c7ce6ec2937f5a3724b1bb4c426533be
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Oct 11 21:39:21 2024 -1000
refactor: improve APIs for handling options (#161)
- Refactor Config APIs to handle options better
- Refactor Table and Storage APIs to improve option passing
- Add a new table config `hoodie.base.path` to carry `base_uri` info
- Improve variable names
- When we say configs, it should refer to `HudiConfigs`
- Let's call string configs in key value pairs "options", it can be
storage options, hudi options, etc
---
crates/core/src/config/mod.rs | 114 ++++++++++++++++++++--
crates/core/src/config/table.rs | 7 +-
crates/core/src/config/utils.rs | 192 +++++++++++++++++++++++++++++++++++++
crates/core/src/file_group/mod.rs | 8 +-
crates/core/src/storage/mod.rs | 102 ++++++++++++++------
crates/core/src/storage/utils.rs | 33 +------
crates/core/src/table/fs_view.rs | 61 ++++++------
crates/core/src/table/mod.rs | 98 ++++++++++---------
crates/core/src/table/partition.rs | 14 +--
crates/core/src/table/timeline.rs | 44 ++++-----
crates/datafusion/src/lib.rs | 13 ++-
python/hudi/_internal.pyi | 2 +-
python/src/internal.rs | 39 ++++----
13 files changed, 512 insertions(+), 215 deletions(-)
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 2893c19..580be44 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -21,11 +21,14 @@ use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
+use crate::storage::utils::parse_uri;
use anyhow::Result;
+use url::Url;
pub mod internal;
pub mod read;
pub mod table;
+pub mod utils;
pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";
@@ -92,6 +95,17 @@ impl HudiConfigValue {
pub fn to<T: 'static + std::fmt::Debug + From<HudiConfigValue>>(self) -> T
{
T::from(self)
}
+
+ pub fn to_url(self) -> Result<Url> {
+ match self {
+ HudiConfigValue::String(v) => parse_uri(&v),
+ _ => panic!(
+ "Cannot cast {:?} to {}",
+ type_name::<Self>(),
+ type_name::<Url>()
+ ),
+ }
+ }
}
impl From<HudiConfigValue> for bool {
@@ -145,33 +159,61 @@ impl From<HudiConfigValue> for Vec<String> {
/// Hudi configuration container.
#[derive(Clone, Debug)]
pub struct HudiConfigs {
- pub raw_configs: Arc<HashMap<String, String>>,
+ raw_options: Arc<HashMap<String, String>>,
+}
+
+impl From<HashMap<String, String>> for HudiConfigs {
+ fn from(options: HashMap<String, String>) -> Self {
+ Self {
+ raw_options: Arc::new(options),
+ }
+ }
}
impl HudiConfigs {
- /// Create [HudiConfigs] with key-value pairs of [String]s.
- pub fn new(raw_configs: HashMap<String, String>) -> Self {
+ /// Create [HudiConfigs] using opitons in the form of key-value pairs.
+ pub fn new<I, K, V>(options: I) -> Self
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: AsRef<str>,
+ {
+ let raw_options = options
+ .into_iter()
+ .map(|(k, v)| (k.as_ref().to_string(), v.as_ref().to_string()))
+ .collect();
Self {
- raw_configs: Arc::new(raw_configs),
+ raw_options: Arc::new(raw_options),
}
}
- /// Create empty [HudiConfigs].
+ /// Create an empty [HudiConfigs].
pub fn empty() -> Self {
Self {
- raw_configs: Arc::new(HashMap::new()),
+ raw_options: Arc::new(HashMap::new()),
}
}
+ /// Create a deep-copy of the configs as [String] options in the form of
key-value pairs.
+ pub fn as_options(&self) -> HashMap<String, String> {
+ self.raw_options.as_ref().clone()
+ }
+
pub fn validate(&self, parser: impl ConfigParser<Output =
HudiConfigValue>) -> Result<()> {
- parser.validate(&self.raw_configs)
+ parser.validate(&self.raw_options)
+ }
+
+ pub fn contains(&self, key: impl AsRef<str>) -> bool {
+ self.raw_options.contains_key(key.as_ref())
}
+ /// Get value for the given config. Return [Result] with the value.
+ /// If the config is not found or value was not parsed properly, return
[Err].
pub fn get(
&self,
parser: impl ConfigParser<Output = HudiConfigValue>,
) -> Result<HudiConfigValue> {
- parser.parse_value(&self.raw_configs)
+ parser.parse_value(&self.raw_options)
}
/// Get value or default value. If the config has no default value, this
will panic.
@@ -179,7 +221,7 @@ impl HudiConfigs {
&self,
parser: impl ConfigParser<Output = HudiConfigValue>,
) -> HudiConfigValue {
- parser.parse_value_or_default(&self.raw_configs)
+ parser.parse_value_or_default(&self.raw_options)
}
/// Get value or default value. If the config has no default value, this
will return [None].
@@ -187,10 +229,62 @@ impl HudiConfigs {
&self,
parser: impl ConfigParser<Output = HudiConfigValue>,
) -> Option<HudiConfigValue> {
- let res = parser.parse_value(&self.raw_configs);
+ let res = parser.parse_value(&self.raw_options);
match res {
Ok(v) => Some(v),
Err(_) => parser.default_value(),
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_from_hashmap() {
+ let mut options = HashMap::new();
+ options.insert("key1".to_string(), "value1".to_string());
+ options.insert("key2".to_string(), "value2".to_string());
+
+ let config = HudiConfigs::from(options.clone());
+
+ assert_eq!(*config.raw_options, options);
+ }
+
+ #[test]
+ fn test_new() {
+ let options = vec![("key1", "value1"), ("key2", "value2")];
+
+ let config = HudiConfigs::new(options);
+
+ let expected: HashMap<String, String> = vec![
+ ("key1".to_string(), "value1".to_string()),
+ ("key2".to_string(), "value2".to_string()),
+ ]
+ .into_iter()
+ .collect();
+
+ assert_eq!(*config.raw_options, expected);
+ }
+
+ #[test]
+ fn test_empty() {
+ let config = HudiConfigs::empty();
+
+ assert!(config.raw_options.is_empty());
+ }
+
+ #[test]
+ fn test_as_options() {
+ let mut options = HashMap::new();
+ options.insert("key1".to_string(), "value1".to_string());
+ options.insert("key2".to_string(), "value2".to_string());
+
+ let config = HudiConfigs::from(options.clone());
+
+ let result = config.as_options();
+
+ assert_eq!(result, options);
+ }
+}
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 8a5d8d6..107e382 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -27,7 +27,7 @@ use strum_macros::{AsRefStr, EnumIter};
use crate::config::{ConfigParser, HudiConfigValue};
-/// Configurations for Hudi tables, persisted in `hoodie.properties`.
+/// Configurations for Hudi tables, most of them are persisted in
`hoodie.properties`.
///
/// **Example**
///
@@ -47,6 +47,9 @@ pub enum HudiTableConfig {
/// Currently only parquet is supported.
BaseFileFormat,
+ /// Base path to the table.
+ BasePath,
+
/// Table checksum is used to guard against partial writes in HDFS.
/// It is added as the last entry in hoodie.properties and then used to
validate while reading table config.
Checksum,
@@ -105,6 +108,7 @@ impl AsRef<str> for HudiTableConfig {
fn as_ref(&self) -> &str {
match self {
Self::BaseFileFormat => "hoodie.table.base.file.format",
+ Self::BasePath => "hoodie.base.path",
Self::Checksum => "hoodie.table.checksum",
Self::DatabaseName => "hoodie.database.name",
Self::DropsPartitionFields =>
"hoodie.datasource.write.drop.partition.columns",
@@ -150,6 +154,7 @@ impl ConfigParser for HudiTableConfig {
Self::BaseFileFormat => get_result
.and_then(BaseFileFormatValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+ Self::BasePath => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::Checksum => get_result
.and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
.map(HudiConfigValue::Integer),
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs
new file mode 100644
index 0000000..5fd84cd
--- /dev/null
+++ b/crates/core/src/config/utils.rs
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use anyhow::{Context, Result};
+use bytes::Bytes;
+use std::collections::HashMap;
+use std::io::{BufRead, BufReader, Cursor};
+
+pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
+ std::iter::empty::<(&str, &str)>()
+}
+
+pub fn split_hudi_options_from_others<I, K, V>(
+ all_options: I,
+) -> (HashMap<String, String>, HashMap<String, String>)
+where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+{
+ let mut hudi_options = HashMap::new();
+ let mut others = HashMap::new();
+ for (k, v) in all_options {
+ if k.as_ref().starts_with("hoodie.") {
+ hudi_options.insert(k.as_ref().to_string(), v.into());
+ } else {
+ others.insert(k.as_ref().to_string(), v.into());
+ }
+ }
+
+ (hudi_options, others)
+}
+
+pub fn parse_data_for_options(data: &Bytes, split_chars: &str) ->
Result<HashMap<String, String>> {
+ let cursor = Cursor::new(data);
+ let lines = BufReader::new(cursor).lines();
+ let mut options = HashMap::new();
+
+ for line in lines {
+ let line = line.context("Failed to read line")?;
+ let trimmed_line = line.trim();
+ if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
+ continue;
+ }
+ let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
+ let key = parts
+ .next()
+ .context("Missing key in config line")?
+ .trim()
+ .to_owned();
+ let value = parts.next().unwrap_or("").trim().to_owned();
+ options.insert(key, value);
+ }
+
+ Ok(options)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_empty_options() {
+ let empty = empty_options();
+ assert_eq!(empty.count(), 0);
+ }
+
+ #[test]
+ fn test_split_hudi_options_from_others_empty() {
+ let (hudi, others) = split_hudi_options_from_others(Vec::<(&str,
&str)>::new());
+ assert!(hudi.is_empty());
+ assert!(others.is_empty());
+ }
+
+ #[test]
+ fn test_split_hudi_options_from_others_mixed() {
+ let options = vec![
+ ("hoodie.option1", "value1"),
+ ("option2", "value2"),
+ ("hoodie.option3", "value3"),
+ ("option4", "value4"),
+ ];
+ let (hudi, others) = split_hudi_options_from_others(options);
+ assert_eq!(hudi.len(), 2);
+ assert_eq!(hudi["hoodie.option1"], "value1");
+ assert_eq!(hudi["hoodie.option3"], "value3");
+ assert_eq!(others.len(), 2);
+ assert_eq!(others["option2"], "value2");
+ assert_eq!(others["option4"], "value4");
+ }
+
+ #[test]
+ fn test_parse_data_comprehensive() {
+ let data = Bytes::from(
+ "key1=value1\n\
+ key2:value2\n\
+ key3 value3\n\
+ key4\tvalue4\n\
+ key5=value with spaces\n\
+ key6: another spaced value \n\
+ key7\tvalue with tab\n\
+ key8=value with = in it\n\
+ key9:value:with:colons\n\
+ empty_key=\n\
+ #comment line\n\
+ \n\
+ key10==double equals",
+ );
+
+ let result = parse_data_for_options(&data, "=: \t").unwrap();
+
+ let mut expected = HashMap::new();
+ expected.insert("key1".to_string(), "value1".to_string());
+ expected.insert("key2".to_string(), "value2".to_string());
+ expected.insert("key3".to_string(), "value3".to_string());
+ expected.insert("key4".to_string(), "value4".to_string());
+ expected.insert("key5".to_string(), "value with spaces".to_string());
+ expected.insert("key6".to_string(), "another spaced
value".to_string());
+ expected.insert("key7".to_string(), "value with tab".to_string());
+ expected.insert("key8".to_string(), "value with = in it".to_string());
+ expected.insert("key9".to_string(), "value:with:colons".to_string());
+ expected.insert("empty_key".to_string(), "".to_string());
+ expected.insert("key10".to_string(), "=double equals".to_string());
+
+ assert_eq!(result, expected);
+ }
+
+ #[test]
+ fn test_parse_data_with_comments_and_empty_lines() {
+ let data = Bytes::from("key1=value1\n# This is a
comment\n\nkey2:value2");
+ let result = parse_data_for_options(&data, "=:").unwrap();
+ let mut expected = HashMap::new();
+ expected.insert("key1".to_string(), "value1".to_string());
+ expected.insert("key2".to_string(), "value2".to_string());
+ assert_eq!(result, expected);
+ }
+
+ #[test]
+ fn test_parse_data_with_missing_value() {
+ let data = Bytes::from("key1=value1\nkey2=\nkey3:value3");
+ let result = parse_data_for_options(&data, "=:").unwrap();
+ let mut expected = HashMap::new();
+ expected.insert("key1".to_string(), "value1".to_string());
+ expected.insert("key2".to_string(), "".to_string());
+ expected.insert("key3".to_string(), "value3".to_string());
+ assert_eq!(result, expected);
+ }
+
+ #[test]
+ fn test_parse_data_with_trailing_spaces() {
+ let data = Bytes::from("key1 = value1 \n key2: value2 ");
+ let result = parse_data_for_options(&data, "=:").unwrap();
+ let mut expected = HashMap::new();
+ expected.insert("key1".to_string(), "value1".to_string());
+ expected.insert("key2".to_string(), "value2".to_string());
+ assert_eq!(result, expected);
+ }
+
+ #[test]
+ fn test_parse_data_empty_input() {
+ let data = Bytes::from("");
+ let result = parse_data_for_options(&data, "=:").unwrap();
+ assert!(result.is_empty());
+ }
+
+ #[test]
+ fn test_parse_data_invalid_line() {
+ let data = Bytes::from("key1=value1\ninvalid_line\nkey2=value2");
+ let result = parse_data_for_options(&data, "=").unwrap();
+ let mut expected = HashMap::new();
+ expected.insert("key1".to_string(), "value1".to_string());
+ expected.insert("invalid_line".to_string(), "".to_string());
+ expected.insert("key2".to_string(), "value2".to_string());
+ assert_eq!(result, expected);
+ }
+}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 7cd1f47..4605052 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -204,20 +204,20 @@ impl FileGroup {
pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
let as_of = timestamp.to_string();
- return if let Some((_, file_slice)) =
self.file_slices.range(..=as_of).next_back() {
+ if let Some((_, file_slice)) =
self.file_slices.range(..=as_of).next_back() {
Some(file_slice)
} else {
None
- };
+ }
}
pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut
FileSlice> {
let as_of = timestamp.to_string();
- return if let Some((_, file_slice)) =
self.file_slices.range_mut(..=as_of).next_back() {
+ if let Some((_, file_slice)) =
self.file_slices.range_mut(..=as_of).next_back() {
Some(file_slice)
} else {
None
- };
+ }
}
}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 9985d0c..45c0699 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -22,6 +22,10 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use crate::storage::file_info::FileInfo;
+use crate::storage::utils::join_url_segments;
use anyhow::{anyhow, Context, Result};
use arrow::compute::concat_batches;
use arrow::record_batch::RecordBatch;
@@ -35,30 +39,57 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::ParquetMetaData;
use url::Url;
-use crate::storage::file_info::FileInfo;
-use crate::storage::utils::join_url_segments;
-
pub mod file_info;
pub mod file_stats;
pub mod utils;
+#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct Storage {
- base_url: Arc<Url>,
- object_store: Arc<dyn ObjectStore>,
+ pub(crate) base_url: Arc<Url>,
+ pub(crate) object_store: Arc<dyn ObjectStore>,
+ pub(crate) options: Arc<HashMap<String, String>>,
+ pub(crate) hudi_configs: Arc<HudiConfigs>,
}
impl Storage {
- pub fn new(base_url: Arc<Url>, options: &HashMap<String, String>) ->
Result<Arc<Storage>> {
- match parse_url_opts(&base_url, options) {
+ pub fn new(
+ options: Arc<HashMap<String, String>>,
+ hudi_configs: Arc<HudiConfigs>,
+ ) -> Result<Arc<Storage>> {
+ if !hudi_configs.contains(HudiTableConfig::BasePath) {
+ return Err(anyhow!(
+ "Failed to create storage: {} is required.",
+ HudiTableConfig::BasePath.as_ref()
+ ));
+ }
+
+ let base_url = hudi_configs.get(HudiTableConfig::BasePath)?.to_url()?;
+
+ match parse_url_opts(&base_url, options.as_ref()) {
Ok((object_store, _)) => Ok(Arc::new(Storage {
- base_url,
+ base_url: Arc::new(base_url),
object_store: Arc::new(object_store),
+ options,
+ hudi_configs,
})),
Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
}
}
+ #[cfg(test)]
+ pub fn new_with_base_url(base_url: Url) -> Result<Arc<Storage>> {
+ let mut hudi_options = HashMap::new();
+ hudi_options.insert(
+ HudiTableConfig::BasePath.as_ref().to_string(),
+ base_url.as_str().to_string(),
+ );
+ Self::new(
+ Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::new(hudi_options)),
+ )
+ }
+
#[cfg(feature = "datafusion")]
pub fn register_object_store(
&self,
@@ -220,17 +251,32 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir:
Option<&str>) -> Result<Ve
#[cfg(test)]
mod tests {
- use std::collections::{HashMap, HashSet};
+ use super::*;
+ use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::Path;
- use std::sync::Arc;
-
- use object_store::path::Path as ObjPath;
- use url::Url;
use crate::storage::file_info::FileInfo;
use crate::storage::utils::join_url_segments;
use crate::storage::{get_leaf_dirs, Storage};
+ use object_store::path::Path as ObjPath;
+ use url::Url;
+
+ #[test]
+ fn test_storage_new_error_no_base_path() {
+ let options = Arc::new(HashMap::new());
+ let hudi_configs = Arc::new(HudiConfigs::empty());
+ let result = Storage::new(options, hudi_configs);
+
+ assert!(
+ result.is_err(),
+ "Should return error when no base path is provided."
+ );
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Failed to create storage"));
+ }
#[tokio::test]
async fn storage_list_dirs() {
@@ -238,7 +284,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let first_level_dirs: HashSet<String> =
storage.list_dirs(None).await.unwrap().into_iter().collect();
assert_eq!(
@@ -260,7 +306,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let first_level_dirs: HashSet<ObjPath> = storage
.list_dirs_as_obj_paths(None)
.await
@@ -283,7 +329,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let file_info_1: Vec<FileInfo> = storage
.list_files(None)
.await
@@ -293,7 +339,9 @@ mod tests {
assert_eq!(
file_info_1,
vec![FileInfo {
- uri: storage.base_url.join("a.parquet").unwrap().to_string(),
+ uri: join_url_segments(&storage.base_url, &["a.parquet"])
+ .unwrap()
+ .to_string(),
name: "a.parquet".to_string(),
size: 0,
}]
@@ -307,9 +355,7 @@ mod tests {
assert_eq!(
file_info_2,
vec![FileInfo {
- uri: storage
- .base_url
- .join("part1/b.parquet")
+ uri: join_url_segments(&storage.base_url, &["part1/b.parquet"])
.unwrap()
.to_string(),
name: "b.parquet".to_string(),
@@ -325,9 +371,7 @@ mod tests {
assert_eq!(
file_info_3,
vec![FileInfo {
- uri: storage
- .base_url
- .join("part2/part22/c.parquet")
+ uri: join_url_segments(&storage.base_url,
&["part2/part22/c.parquet"])
.unwrap()
.to_string(),
name: "c.parquet".to_string(),
@@ -342,7 +386,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
assert_eq!(
leaf_dirs,
@@ -355,7 +399,7 @@ mod tests {
let base_url =
Url::from_directory_path(canonicalize(Path::new("tests/data/leaf_dir")).unwrap())
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
assert_eq!(
leaf_dirs,
@@ -368,12 +412,14 @@ mod tests {
async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let file_info = storage.get_file_info("a.parquet").await.unwrap();
assert_eq!(file_info.name, "a.parquet");
assert_eq!(
file_info.uri,
- storage.base_url.join("a.parquet").unwrap().as_ref()
+ join_url_segments(&storage.base_url, &["a.parquet"])
+ .unwrap()
+ .to_string()
);
assert_eq!(file_info.size, 866);
}
@@ -382,7 +428,7 @@ mod tests {
async fn storage_get_parquet_file_data() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let file_data =
storage.get_parquet_file_data("a.parquet").await.unwrap();
assert_eq!(file_data.num_rows(), 5);
}
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index 80c86c6..ba670f2 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -16,13 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-use std::collections::HashMap;
-use std::io::{BufRead, BufReader, Cursor};
use std::path::{Path, PathBuf};
use std::str::FromStr;
-use anyhow::{anyhow, Context, Result};
-use bytes::Bytes;
+use anyhow::{anyhow, Result};
use url::{ParseError, Url};
pub fn split_filename(filename: &str) -> Result<(String, String)> {
@@ -78,34 +75,6 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str])
-> Result<Url> {
Ok(url)
}
-pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
- std::iter::empty::<(&str, &str)>()
-}
-
-pub async fn parse_config_data(data: &Bytes, split_chars: &str) ->
Result<HashMap<String, String>> {
- let cursor = Cursor::new(data);
- let lines = BufReader::new(cursor).lines();
- let mut configs = HashMap::new();
-
- for line in lines {
- let line = line.context("Failed to read line")?;
- let trimmed_line = line.trim();
- if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
- continue;
- }
- let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
- let key = parts
- .next()
- .context("Missing key in config line")?
- .trim()
- .to_owned();
- let value = parts.next().unwrap_or("").trim().to_owned();
- configs.insert(key, value);
- }
-
- Ok(configs)
-}
-
#[cfg(test)]
mod tests {
use std::str::FromStr;
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 4172f12..0cc6ad1 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -29,26 +29,24 @@ use anyhow::Result;
use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};
-use url::Url;
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct FileSystemView {
- configs: Arc<HudiConfigs>,
+ hudi_configs: Arc<HudiConfigs>,
pub(crate) storage: Arc<Storage>,
partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
}
impl FileSystemView {
pub async fn new(
- base_url: Arc<Url>,
+ hudi_configs: Arc<HudiConfigs>,
storage_options: Arc<HashMap<String, String>>,
- configs: Arc<HudiConfigs>,
) -> Result<Self> {
- let storage = Storage::new(base_url, &storage_options)?;
+ let storage = Storage::new(storage_options.clone(),
hudi_configs.clone())?;
let partition_to_file_groups = Arc::new(DashMap::new());
Ok(FileSystemView {
- configs,
+ hudi_configs,
storage,
partition_to_file_groups,
})
@@ -189,20 +187,30 @@ impl FileSystemView {
#[cfg(test)]
mod tests {
- use hudi_tests::TestTable;
- use std::collections::{HashMap, HashSet};
- use std::sync::Arc;
-
+ use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::Table;
+ use hudi_tests::TestTable;
+ use std::collections::{HashMap, HashSet};
+ use std::sync::Arc;
+ use url::Url;
+
+ async fn create_test_fs_view(base_url: Url) -> FileSystemView {
+ FileSystemView::new(
+ Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath,
base_url)])),
+ Arc::new(HashMap::new()),
+ )
+ .await
+ .unwrap()
+ }
#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::load_partition_paths(&storage,
&partition_pruner)
.await
@@ -215,7 +223,7 @@ mod tests {
#[tokio::test]
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::load_partition_paths(&storage,
&partition_pruner)
.await
@@ -235,13 +243,7 @@ mod tests {
#[tokio::test]
async fn fs_view_get_latest_file_slices() {
let base_url = TestTable::V6Nonpartitioned.url();
- let fs_view = FileSystemView::new(
- Arc::new(base_url),
- Arc::new(HashMap::new()),
- Arc::new(HudiConfigs::empty()),
- )
- .await
- .unwrap();
+ let fs_view = create_test_fs_view(base_url).await;
assert!(fs_view.partition_to_file_groups.is_empty());
let partition_pruner = PartitionPruner::empty();
@@ -266,13 +268,7 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_replace_commit() {
let base_url =
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let fs_view = FileSystemView::new(
- Arc::new(base_url),
- Arc::new(HashMap::new()),
- Arc::new(HudiConfigs::empty()),
- )
- .await
- .unwrap();
+ let fs_view = create_test_fs_view(base_url).await;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let partition_pruner = PartitionPruner::empty();
@@ -301,14 +297,10 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_partition_filters() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let fs_view = FileSystemView::new(
- Arc::new(base_url),
- Arc::new(HashMap::new()),
- Arc::new(HudiConfigs::empty()),
- )
- .await
- .unwrap();
+ let fs_view = create_test_fs_view(base_url).await;
+
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
+
let excludes = &hudi_table
.timeline
.get_replaced_file_groups()
@@ -318,7 +310,7 @@ mod tests {
let partition_pruner = PartitionPruner::new(
&["byteField < 20", "shortField = 300"],
&partition_schema,
- hudi_table.configs.as_ref(),
+ hudi_table.hudi_configs.as_ref(),
)
.unwrap();
let file_slices = fs_view
@@ -327,6 +319,7 @@ mod tests {
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
+
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 931f2c2..9ca5817 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -39,7 +39,7 @@
//!
//! pub async fn test() {
//! use arrow_schema::Schema;
-//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
//! let schema = hudi_table.get_schema().await.unwrap();
//! }
@@ -95,7 +95,6 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use strum::IntoEnumIterator;
use url::Url;
-
use HudiInternalConfig::SkipConfigValidation;
use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
use TableTypeValue::CopyOnWrite;
@@ -105,10 +104,11 @@ use crate::config::read::HudiReadConfig;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
+use crate::config::utils::parse_data_for_options;
+use crate::config::utils::{empty_options, split_hudi_options_from_others};
use crate::config::HudiConfigs;
use crate::config::HUDI_CONF_DIR;
use crate::file_group::FileSlice;
-use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
@@ -121,9 +121,8 @@ mod timeline;
/// Hudi Table in-memory
#[derive(Clone, Debug)]
pub struct Table {
- pub base_url: Arc<Url>,
- pub configs: Arc<HudiConfigs>,
- pub extra_options: Arc<HashMap<String, String>>,
+ pub hudi_configs: Arc<HudiConfigs>,
+ pub storage_options: Arc<HashMap<String, String>>,
pub timeline: Timeline,
pub file_system_view: FileSystemView,
}
@@ -135,38 +134,38 @@ impl Table {
}
/// Create hudi table with options
- pub async fn new_with_options<I, K, V>(base_uri: &str, all_options: I) ->
Result<Self>
+ pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
- let base_url = Arc::new(parse_uri(base_uri)?);
-
- let (configs, extra_options) = Self::load_configs(base_url.clone(),
all_options)
+ let (hudi_configs, storage_options) = Self::load_configs(base_uri,
options)
.await
.context("Failed to load table properties")?;
- let configs = Arc::new(configs);
- let extra_options = Arc::new(extra_options);
+ let hudi_configs = Arc::new(hudi_configs);
+ let storage_options = Arc::new(storage_options);
- let timeline = Timeline::new(base_url.clone(), extra_options.clone(),
configs.clone())
+ let timeline = Timeline::new(hudi_configs.clone(),
storage_options.clone())
.await
.context("Failed to load timeline")?;
- let file_system_view =
- FileSystemView::new(base_url.clone(), extra_options.clone(),
configs.clone())
- .await
- .context("Failed to load file system view")?;
+ let file_system_view = FileSystemView::new(hudi_configs.clone(),
storage_options.clone())
+ .await
+ .context("Failed to load file system view")?;
Ok(Table {
- base_url,
- configs,
- extra_options,
+ hudi_configs,
+ storage_options,
timeline,
file_system_view,
})
}
+ pub fn base_url(&self) -> Result<Url> {
+ self.hudi_configs.get(HudiTableConfig::BasePath)?.to_url()
+ }
+
#[cfg(feature = "datafusion")]
pub fn register_storage(
&self,
@@ -181,7 +180,7 @@ impl Table {
}
async fn load_configs<I, K, V>(
- base_url: Arc<Url>,
+ base_uri: &str,
all_options: I,
) -> Result<(HudiConfigs, HashMap<String, String>)>
where
@@ -190,27 +189,32 @@ impl Table {
V: Into<String>,
{
let mut hudi_options = HashMap::new();
- let mut extra_options = HashMap::new();
+ let mut other_options = HashMap::new();
- Self::imbue_cloud_env_vars(&mut extra_options);
+ Self::imbue_cloud_env_vars(&mut other_options);
- for (k, v) in all_options {
- if k.as_ref().starts_with("hoodie.") {
- hudi_options.insert(k.as_ref().to_string(), v.into());
- } else {
- extra_options.insert(k.as_ref().to_string(), v.into());
- }
- }
+ let (hudi_opts, others) = split_hudi_options_from_others(all_options);
+ hudi_options.extend(hudi_opts);
+ other_options.extend(others);
+
+ hudi_options.insert(
+ HudiTableConfig::BasePath.as_ref().to_string(),
+ base_uri.to_string(),
+ );
- let storage = Storage::new(base_url, &extra_options)?;
+ // create a [Storage] instance to load properties from storage layer.
+ let storage = Storage::new(
+ Arc::new(other_options.clone()),
+ Arc::new(HudiConfigs::new(&hudi_options)),
+ )?;
Self::imbue_table_properties(&mut hudi_options,
storage.clone()).await?;
- Self::imbue_global_hudi_configs(&mut hudi_options,
storage.clone()).await?;
+ Self::imbue_global_hudi_configs_if_not_present(&mut hudi_options,
storage.clone()).await?;
let hudi_configs = HudiConfigs::new(hudi_options);
- Self::validate_configs(&hudi_configs).map(|_| (hudi_configs,
extra_options))
+ Self::validate_configs(&hudi_configs).map(|_| (hudi_configs,
other_options))
}
fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
@@ -230,7 +234,7 @@ impl Table {
storage: Arc<Storage>,
) -> Result<()> {
let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
- let table_properties = parse_config_data(&bytes, "=").await?;
+ let table_properties = parse_data_for_options(&bytes, "=")?;
// TODO: handle the case where the same key is present in both table
properties and options
for (k, v) in table_properties {
@@ -240,7 +244,7 @@ impl Table {
Ok(())
}
- async fn imbue_global_hudi_configs(
+ async fn imbue_global_hudi_configs_if_not_present(
options: &mut HashMap<String, String>,
storage: Arc<Storage>,
) -> Result<()> {
@@ -253,7 +257,7 @@ impl Table {
.get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
.await
{
- if let Ok(global_configs) = parse_config_data(&bytes, "
\t=").await {
+ if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=")
{
for (key, value) in global_configs {
if key.starts_with("hoodie.") &&
!options.contains_key(&key) {
options.insert(key.to_string(), value.to_string());
@@ -313,7 +317,7 @@ impl Table {
/// Get the latest partition [Schema] of the table
pub async fn get_partition_schema(&self) -> Result<Schema> {
let partition_fields: HashSet<String> = self
- .configs
+ .hudi_configs
.get_or_default(PartitionFields)
.to::<Vec<String>>()
.into_iter()
@@ -350,7 +354,7 @@ impl Table {
///
/// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
pub async fn get_file_slices(&self, filters: &[&str]) ->
Result<Vec<FileSlice>> {
- if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+ if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.get_file_slices_as_of(timestamp.to::<String>().as_str(),
filters)
.await
} else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
@@ -369,7 +373,7 @@ impl Table {
let excludes = self.timeline.get_replaced_file_groups().await?;
let partition_schema = self.get_partition_schema().await?;
let partition_pruner =
- PartitionPruner::new(filters, &partition_schema,
self.configs.as_ref())?;
+ PartitionPruner::new(filters, &partition_schema,
self.hudi_configs.as_ref())?;
self.file_system_view
.get_file_slices_as_of(timestamp, &partition_pruner, &excludes)
.await
@@ -379,7 +383,7 @@ impl Table {
///
/// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
pub async fn read_snapshot(&self, filters: &[&str]) ->
Result<Vec<RecordBatch>> {
- if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+ if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.read_snapshot_as_of(timestamp.to::<String>().as_str(),
filters)
.await
} else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
@@ -555,7 +559,7 @@ mod tests {
#[tokio::test]
async fn validate_invalid_table_props() {
let table =
get_test_table_without_validation("table_props_invalid").await;
- let configs = table.configs;
+ let configs = table.hudi_configs;
assert!(
configs.validate(BaseFileFormat).is_err(),
"required config is missing"
@@ -603,7 +607,7 @@ mod tests {
#[tokio::test]
async fn get_invalid_table_props() {
let table =
get_test_table_without_validation("table_props_invalid").await;
- let configs = table.configs;
+ let configs = table.hudi_configs;
assert!(configs.get(BaseFileFormat).is_err());
assert!(configs.get(Checksum).is_err());
assert!(configs.get(DatabaseName).is_err());
@@ -624,7 +628,7 @@ mod tests {
#[tokio::test]
async fn get_default_for_invalid_table_props() {
let table =
get_test_table_without_validation("table_props_invalid").await;
- let configs = table.configs;
+ let configs = table.hudi_configs;
assert!(panic::catch_unwind(||
configs.get_or_default(BaseFileFormat)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(Checksum)).is_err());
assert_eq!(
@@ -654,7 +658,7 @@ mod tests {
#[tokio::test]
async fn get_valid_table_props() {
let table =
get_test_table_without_validation("table_props_valid").await;
- let configs = table.configs;
+ let configs = table.hudi_configs;
assert_eq!(
configs.get(BaseFileFormat).unwrap().to::<String>(),
"parquet"
@@ -691,7 +695,7 @@ mod tests {
async fn get_global_table_props() {
// Without the environment variable HUDI_CONF_DIR
let table =
get_test_table_without_validation("table_props_partial").await;
- let configs = table.configs;
+ let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
@@ -701,7 +705,7 @@ mod tests {
let hudi_conf_dir = base_path.join("random/wrong/dir");
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
let table =
get_test_table_without_validation("table_props_partial").await;
- let configs = table.configs;
+ let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
@@ -711,7 +715,7 @@ mod tests {
let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir");
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
let table =
get_test_table_without_validation("table_props_partial").await;
- let configs = table.configs;
+ let configs = table.hudi_configs;
assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
assert_eq!(
configs.get(TableType).unwrap().to::<String>(),
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index e34b738..7dcf482 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -406,16 +406,10 @@ mod tests {
}
fn create_hudi_configs(is_hive_style: bool, is_url_encoded: bool) ->
HudiConfigs {
- HudiConfigs::new(HashMap::from([
- (
- IsHiveStylePartitioning.as_ref().to_string(),
- is_hive_style.to_string(),
- ),
- (
- IsPartitionPathUrlencoded.as_ref().to_string(),
- is_url_encoded.to_string(),
- ),
- ]))
+ HudiConfigs::new([
+ (IsHiveStylePartitioning, is_hive_style.to_string()),
+ (IsPartitionPathUrlencoded, is_url_encoded.to_string()),
+ ])
}
#[test]
fn test_partition_pruner_new() {
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index e502cd6..6dc9df5 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -27,7 +27,6 @@ use anyhow::{anyhow, Context, Result};
use arrow_schema::Schema;
use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
-use url::Url;
use crate::config::HudiConfigs;
use crate::file_group::FileGroup;
@@ -91,22 +90,21 @@ impl Instant {
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct Timeline {
- configs: Arc<HudiConfigs>,
+ hudi_configs: Arc<HudiConfigs>,
pub(crate) storage: Arc<Storage>,
pub instants: Vec<Instant>,
}
impl Timeline {
pub async fn new(
- base_url: Arc<Url>,
+ hudi_configs: Arc<HudiConfigs>,
storage_options: Arc<HashMap<String, String>>,
- configs: Arc<HudiConfigs>,
) -> Result<Self> {
- let storage = Storage::new(base_url, &storage_options)?;
+ let storage = Storage::new(storage_options.clone(),
hudi_configs.clone())?;
let instants = Self::load_completed_commits(&storage).await?;
Ok(Self {
storage,
- configs,
+ hudi_configs,
instants,
})
}
@@ -222,19 +220,23 @@ mod tests {
use hudi_tests::TestTable;
+ use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
use crate::table::timeline::{Instant, State, Timeline};
- #[tokio::test]
- async fn timeline_read_latest_schema() {
- let base_url = TestTable::V6Nonpartitioned.url();
- let timeline = Timeline::new(
- Arc::new(base_url),
+ async fn create_test_timeline(base_url: Url) -> Timeline {
+ Timeline::new(
+ Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath,
base_url)])),
Arc::new(HashMap::new()),
- Arc::new(HudiConfigs::empty()),
)
.await
- .unwrap();
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn timeline_read_latest_schema() {
+ let base_url = TestTable::V6Nonpartitioned.url();
+ let timeline = create_test_timeline(base_url).await;
let table_schema = timeline.get_latest_schema().await.unwrap();
assert_eq!(table_schema.fields.len(), 21)
}
@@ -242,13 +244,7 @@ mod tests {
#[tokio::test]
async fn timeline_read_latest_schema_from_empty_table() {
let base_url = TestTable::V6Empty.url();
- let timeline = Timeline::new(
- Arc::new(base_url),
- Arc::new(HashMap::new()),
- Arc::new(HudiConfigs::empty()),
- )
- .await
- .unwrap();
+ let timeline = create_test_timeline(base_url).await;
let table_schema = timeline.get_latest_schema().await;
assert!(table_schema.is_err());
assert_eq!(
@@ -263,13 +259,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let timeline = Timeline::new(
- Arc::new(base_url),
- Arc::new(HashMap::new()),
- Arc::new(HudiConfigs::empty()),
- )
- .await
- .unwrap();
+ let timeline = create_test_timeline(base_url).await;
assert_eq!(
timeline.instants,
vec![
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 6aa2035..b090696 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -39,7 +39,8 @@ use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::create_physical_expr;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::storage::utils::{empty_options, get_scheme_authority,
parse_uri};
+use hudi_core::config::utils::empty_options;
+use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
use hudi_core::table::Table as HudiTable;
#[derive(Clone, Debug)]
@@ -66,7 +67,7 @@ impl HudiDataSource {
fn get_input_partitions(&self) -> usize {
self.table
- .configs
+ .hudi_configs
.get_or_default(InputPartitions)
.to::<usize>()
}
@@ -119,7 +120,13 @@ impl TableProvider for HudiDataSource {
parquet_file_groups.push(parquet_file_group_vec)
}
- let url =
ObjectStoreUrl::parse(get_scheme_authority(&self.table.base_url))?;
+ let base_url = self.table.base_url().map_err(|e| {
+ Execution(format!(
+ "Failed to get base path config from Hudi table: {}",
+ e
+ ))
+ })?;
+ let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
let fsc = FileScanConfig::new(url, self.schema())
.with_file_groups(parquet_file_groups)
.with_projection(projection.cloned())
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index bc6023b..47da6aa 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -37,7 +37,7 @@ class HudiFileSlice:
class HudiTable:
def __init__(
self,
- table_uri: str,
+ base_uri: str,
options: Optional[Dict[str, str]] = None,
): ...
def get_schema(self) -> "pyarrow.Schema": ...
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 70b9f56..4c448b5 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -28,6 +28,12 @@ use tokio::runtime::Runtime;
use hudi::file_group::FileSlice;
use hudi::table::Table;
+macro_rules! vec_string_to_slice {
+ ($vec:expr) => {
+ &$vec.iter().map(AsRef::as_ref).collect::<Vec<_>>()
+ };
+}
+
#[cfg(not(tarpaulin))]
#[derive(Clone, Debug)]
#[pyclass]
@@ -85,37 +91,34 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
}
}
-macro_rules! vec_string_to_slice {
- ($vec:expr) => {
- &$vec.iter().map(AsRef::as_ref).collect::<Vec<_>>()
- };
-}
-
#[cfg(not(tarpaulin))]
#[pyclass]
pub struct HudiTable {
- _table: Table,
+ inner: Table,
}
#[cfg(not(tarpaulin))]
#[pymethods]
impl HudiTable {
#[new]
- #[pyo3(signature = (table_uri, options = None))]
- fn new(table_uri: &str, options: Option<HashMap<String, String>>) ->
PyResult<Self> {
- let _table = rt().block_on(Table::new_with_options(
- table_uri,
+ #[pyo3(signature = (base_uri, options=None))]
+ fn new_with_options(
+ base_uri: &str,
+ options: Option<HashMap<String, String>>,
+ ) -> PyResult<Self> {
+ let inner: Table = rt().block_on(Table::new_with_options(
+ base_uri,
options.unwrap_or_default(),
))?;
- Ok(HudiTable { _table })
+ Ok(HudiTable { inner })
}
fn get_schema(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self._table.get_schema())?.to_pyarrow(py)
+ rt().block_on(self.inner.get_schema())?.to_pyarrow(py)
}
fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self._table.get_partition_schema())?
+ rt().block_on(self.inner.get_partition_schema())?
.to_pyarrow(py)
}
@@ -128,7 +131,7 @@ impl HudiTable {
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
py.allow_threads(|| {
let file_slices = rt().block_on(
- self._table
+ self.inner
.split_file_slices(n,
vec_string_to_slice!(filters.unwrap_or_default())),
)?;
Ok(file_slices
@@ -146,7 +149,7 @@ impl HudiTable {
) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
let file_slices = rt().block_on(
- self._table
+ self.inner
.get_file_slices(vec_string_to_slice!(filters.unwrap_or_default())),
)?;
Ok(file_slices.iter().map(convert_file_slice).collect())
@@ -154,14 +157,14 @@ impl HudiTable {
}
fn read_file_slice(&self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
- rt().block_on(self._table.read_file_slice_by_path(relative_path))?
+ rt().block_on(self.inner.read_file_slice_by_path(relative_path))?
.to_pyarrow(py)
}
#[pyo3(signature = (filters=None))]
fn read_snapshot(&self, filters: Option<Vec<String>>, py: Python) ->
PyResult<PyObject> {
rt().block_on(
- self._table
+ self.inner
.read_snapshot(vec_string_to_slice!(filters.unwrap_or_default())),
)?
.to_pyarrow(py)