This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 2fcc964 feat: add `HudiConfigs` for parsing and managing named
configs (#37)
2fcc964 is described below
commit 2fcc964e945bbc9129af1955aa6b623f4a1367ed
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jul 5 05:07:30 2024 -0500
feat: add `HudiConfigs` for parsing and managing named configs (#37)
- introduce trait `ConfigParser` and `HudiConfigs`
- introduce enums `HudiReadConfig` and `HudiTableConfig` to manage
different categories of configs
- make use of `HudiConfigs` and config enums to parse, validate, and
retrieve hudi configs
Fixes #38
---
crates/core/Cargo.toml | 1 +
crates/core/src/config/mod.rs | 171 +++++++-----
crates/core/src/config/read.rs | 101 +++++++
crates/core/src/config/table.rs | 226 ++++++++++++++++
crates/core/src/table/config.rs | 144 ----------
crates/core/src/table/fs_view.rs | 10 +-
crates/core/src/table/metadata.rs | 55 ----
crates/core/src/table/mod.rs | 292 +++++++++++----------
crates/core/src/table/timeline.rs | 14 +-
.../.hoodie/hoodie.properties | 27 +-
.../.hoodie/hoodie.properties | 3 +-
crates/datafusion/src/lib.rs | 20 +-
crates/tests/src/utils.rs | 14 +
13 files changed, 639 insertions(+), 439 deletions(-)
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 5d04337..06e15a4 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -61,6 +61,7 @@ bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
hashbrown = "0.14.3"
regex = { workspace = true }
+strum_macros = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 3322df3..2399f93 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -16,103 +16,144 @@
* specific language governing permissions and limitations
* under the License.
*/
+use std::any::type_name;
use std::collections::HashMap;
+use std::sync::Arc;
-use anyhow::{anyhow, Context, Result};
+use anyhow::Result;
-pub trait OptionsParser {
+pub mod read;
+pub mod table;
+
+pub trait ConfigParser: AsRef<str> {
type Output;
- fn parse_value(&self, options: &HashMap<String, String>) ->
Result<Self::Output>;
+ fn default_value(&self) -> Option<Self::Output>;
- fn parse_value_or_default(&self, options: &HashMap<String, String>) ->
Self::Output;
-}
+ fn is_required(&self) -> bool {
+ false
+ }
+
+ fn validate(&self, configs: &HashMap<String, String>) -> Result<()> {
+ match self.parse_value(configs) {
+ Ok(_) => Ok(()),
+ Err(e) => {
+ if !self.is_required() && e.to_string().ends_with("not found")
{
+ // TODO: introduce error type to avoid checking "not found"
+ Ok(())
+ } else {
+ Err(e)
+ }
+ }
+ }
+ }
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
-pub enum HudiConfig {
- ReadInputPartitions,
+ fn parse_value(&self, configs: &HashMap<String, String>) ->
Result<Self::Output>;
+
+ fn parse_value_or_default(&self, configs: &HashMap<String, String>) ->
Self::Output {
+ self.parse_value(configs).unwrap_or_else(|_| {
+ self.default_value()
+ .unwrap_or_else(|| panic!("No default value for config '{}'",
self.as_ref()))
+ })
+ }
}
#[derive(Debug)]
pub enum HudiConfigValue {
+ Boolean(bool),
Integer(isize),
+ UInteger(usize),
+ String(String),
+ List(Vec<String>),
}
impl HudiConfigValue {
- pub fn cast<T: 'static + TryFrom<isize> + TryFrom<usize> +
std::fmt::Debug>(&self) -> T {
- match self {
- HudiConfigValue::Integer(value) =>
T::try_from(*value).unwrap_or_else(|_| {
- panic!("Failed to convert isize to {}",
std::any::type_name::<T>())
- }),
+ pub fn to<T: 'static + std::fmt::Debug + From<HudiConfigValue>>(self) -> T
{
+ T::from(self)
+ }
+}
+
+impl From<HudiConfigValue> for bool {
+ fn from(value: HudiConfigValue) -> Self {
+ match value {
+ HudiConfigValue::Boolean(v) => v,
+ _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
}
}
}
-impl HudiConfig {
- fn default_value(&self) -> Option<HudiConfigValue> {
- match self {
- Self::ReadInputPartitions => Some(HudiConfigValue::Integer(0)),
+impl From<HudiConfigValue> for isize {
+ fn from(value: HudiConfigValue) -> Self {
+ match value {
+ HudiConfigValue::Integer(v) => v,
+ _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
}
}
}
-impl AsRef<str> for HudiConfig {
- fn as_ref(&self) -> &str {
- match self {
- Self::ReadInputPartitions => "hoodie.read.input.partitions",
+impl From<HudiConfigValue> for usize {
+ fn from(value: HudiConfigValue) -> Self {
+ match value {
+ HudiConfigValue::UInteger(v) => v,
+ _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
}
}
}
-impl OptionsParser for HudiConfig {
- type Output = HudiConfigValue;
-
- fn parse_value(&self, options: &HashMap<String, String>) ->
Result<Self::Output> {
- match self {
- HudiConfig::ReadInputPartitions =>
options.get(self.as_ref()).map_or_else(
- || Err(anyhow!("Config '{}' not found", self.as_ref())),
- |v| {
- v.parse::<isize>()
- .map(HudiConfigValue::Integer)
- .with_context(|| {
- format!("Failed to parse '{}' for config '{}'", v,
self.as_ref())
- })
- },
- ),
+impl From<HudiConfigValue> for String {
+ fn from(value: HudiConfigValue) -> Self {
+ match value {
+ HudiConfigValue::Boolean(v) => v.to_string(),
+ HudiConfigValue::Integer(v) => v.to_string(),
+ HudiConfigValue::UInteger(v) => v.to_string(),
+ HudiConfigValue::String(v) => v,
+ _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
}
}
+}
- fn parse_value_or_default(&self, options: &HashMap<String, String>) ->
Self::Output {
- self.parse_value(options).unwrap_or_else(|_| {
- self.default_value()
- .unwrap_or_else(|| panic!("No default value for config '{}'",
self.as_ref()))
- })
+impl From<HudiConfigValue> for Vec<String> {
+ fn from(value: HudiConfigValue) -> Self {
+ match value {
+ HudiConfigValue::List(v) => v,
+ _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
+ }
}
}
-#[cfg(test)]
-mod tests {
- use crate::config::HudiConfig::ReadInputPartitions;
- use crate::config::OptionsParser;
- use std::collections::HashMap;
-
- #[test]
- fn parse_invalid_config_value() {
- let options =
- HashMap::from([(ReadInputPartitions.as_ref().to_string(),
"foo".to_string())]);
- let value = ReadInputPartitions.parse_value(&options);
- assert_eq!(
- value.err().unwrap().to_string(),
- format!(
- "Failed to parse 'foo' for config '{}'",
- ReadInputPartitions.as_ref()
- )
- );
- assert_eq!(
- ReadInputPartitions
- .parse_value_or_default(&options)
- .cast::<isize>(),
- 0
- );
+#[derive(Clone, Debug)]
+pub struct HudiConfigs {
+ pub raw_configs: Arc<HashMap<String, String>>,
+}
+
+impl HudiConfigs {
+ pub fn new(raw_configs: HashMap<String, String>) -> Self {
+ Self {
+ raw_configs: Arc::new(raw_configs),
+ }
+ }
+
+ pub fn empty() -> Self {
+ Self {
+ raw_configs: Arc::new(HashMap::new()),
+ }
+ }
+
+ pub fn validate(&self, parser: impl ConfigParser<Output =
HudiConfigValue>) -> Result<()> {
+ parser.validate(&self.raw_configs)
+ }
+
+ pub fn get(
+ &self,
+ parser: impl ConfigParser<Output = HudiConfigValue>,
+ ) -> Result<HudiConfigValue> {
+ parser.parse_value(&self.raw_configs)
+ }
+
+ pub fn get_or_default(
+ &self,
+ parser: impl ConfigParser<Output = HudiConfigValue>,
+ ) -> HudiConfigValue {
+ parser.parse_value_or_default(&self.raw_configs)
}
}
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
new file mode 100644
index 0000000..43879de
--- /dev/null
+++ b/crates/core/src/config/read.rs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use anyhow::{anyhow, Result};
+
+use crate::config::{ConfigParser, HudiConfigValue};
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+pub enum HudiReadConfig {
+ InputPartitions,
+}
+
+impl AsRef<str> for HudiReadConfig {
+ fn as_ref(&self) -> &str {
+ match self {
+ Self::InputPartitions => "hoodie.read.input.partitions",
+ }
+ }
+}
+
+impl ConfigParser for HudiReadConfig {
+ type Output = HudiConfigValue;
+
+ fn default_value(&self) -> Option<HudiConfigValue> {
+ match self {
+ HudiReadConfig::InputPartitions =>
Some(HudiConfigValue::UInteger(0usize)),
+ }
+ }
+
+ fn parse_value(&self, configs: &HashMap<String, String>) ->
Result<Self::Output> {
+ let get_result = configs
+ .get(self.as_ref())
+ .map(|v| v.as_str())
+ .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+
+ match self {
+ HudiReadConfig::InputPartitions => get_result
+ .and_then(|v| {
+ usize::from_str(v).map_err(|e| {
+ anyhow!(
+ "Failed to parse '{}' for config '{}': {}",
+ v,
+ self.as_ref(),
+ e
+ )
+ })
+ })
+ .map(HudiConfigValue::UInteger),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use crate::config::read::HudiReadConfig::InputPartitions;
+ use crate::config::ConfigParser;
+
+ #[test]
+ fn parse_valid_config_value() {
+ let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"100".to_string())]);
+ let value =
InputPartitions.parse_value(&options).unwrap().to::<usize>();
+ assert_eq!(value, 100usize);
+ }
+
+ #[test]
+ fn parse_invalid_config_value() {
+ let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"foo".to_string())]);
+ let value = InputPartitions.parse_value(&options);
+ assert!(value.err().unwrap().to_string().starts_with(&format!(
+ "Failed to parse 'foo' for config '{}'",
+ InputPartitions.as_ref()
+ )));
+ assert_eq!(
+ InputPartitions
+ .parse_value_or_default(&options)
+ .to::<usize>(),
+ 0
+ );
+ }
+}
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
new file mode 100644
index 0000000..d503e32
--- /dev/null
+++ b/crates/core/src/config/table.rs
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use anyhow::anyhow;
+use anyhow::Result;
+use strum_macros::AsRefStr;
+
+use crate::config::{ConfigParser, HudiConfigValue};
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+pub enum HudiTableConfig {
+ BaseFileFormat,
+ Checksum,
+ DatabaseName,
+ DropsPartitionFields,
+ IsHiveStylePartitioning,
+ IsPartitionPathUrlencoded,
+ KeyGeneratorClass,
+ PartitionFields,
+ PrecombineField,
+ PopulatesMetaFields,
+ RecordKeyFields,
+ TableName,
+ TableType,
+ TableVersion,
+ TimelineLayoutVersion,
+}
+
+impl AsRef<str> for HudiTableConfig {
+ fn as_ref(&self) -> &str {
+ match self {
+ Self::BaseFileFormat => "hoodie.table.base.file.format",
+ Self::Checksum => "hoodie.table.checksum",
+ Self::DatabaseName => "hoodie.database.name",
+ Self::DropsPartitionFields =>
"hoodie.datasource.write.drop.partition.columns",
+ Self::IsHiveStylePartitioning =>
"hoodie.datasource.write.hive_style_partitioning",
+ Self::IsPartitionPathUrlencoded =>
"hoodie.datasource.write.partitionpath.urlencode",
+ Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
+ Self::PartitionFields => "hoodie.table.partition.fields",
+ Self::PrecombineField => "hoodie.table.precombine.field",
+ Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
+ Self::RecordKeyFields => "hoodie.table.recordkey.fields",
+ Self::TableName => "hoodie.table.name",
+ Self::TableType => "hoodie.table.type",
+ Self::TableVersion => "hoodie.table.version",
+ Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
+ }
+ }
+}
+
+impl ConfigParser for HudiTableConfig {
+ type Output = HudiConfigValue;
+
+ fn default_value(&self) -> Option<Self::Output> {
+ match self {
+ Self::DatabaseName =>
Some(HudiConfigValue::String("default".to_string())),
+ Self::DropsPartitionFields =>
Some(HudiConfigValue::Boolean(false)),
+ Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
+ _ => None,
+ }
+ }
+
+ fn is_required(&self) -> bool {
+ matches!(
+ self,
+ Self::BaseFileFormat | Self::TableName | Self::TableType |
Self::TableVersion
+ )
+ }
+
+ fn parse_value(&self, configs: &HashMap<String, String>) ->
Result<Self::Output> {
+ let get_result = configs
+ .get(self.as_ref())
+ .map(|v| v.as_str())
+ .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+
+ match self {
+ Self::BaseFileFormat => get_result
+ .and_then(BaseFileFormatValue::from_str)
+ .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+ Self::Checksum => get_result
+ .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+ .map(HudiConfigValue::Integer),
+ Self::DatabaseName => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::DropsPartitionFields => get_result
+ .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .map(HudiConfigValue::Boolean),
+ Self::IsHiveStylePartitioning => get_result
+ .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .map(HudiConfigValue::Boolean),
+ Self::IsPartitionPathUrlencoded => get_result
+ .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .map(HudiConfigValue::Boolean),
+ Self::KeyGeneratorClass => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::PartitionFields => get_result
+ .map(|v|
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
+ Self::PrecombineField => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::PopulatesMetaFields => get_result
+ .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .map(HudiConfigValue::Boolean),
+ Self::RecordKeyFields => get_result
+ .map(|v|
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
+ Self::TableName => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::TableType => get_result
+ .and_then(TableTypeValue::from_str)
+ .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+ Self::TableVersion => get_result
+ .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+ .map(HudiConfigValue::Integer),
+ Self::TimelineLayoutVersion => get_result
+ .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+ .map(HudiConfigValue::Integer),
+ }
+ }
+}
+
+#[derive(Clone, Debug, PartialEq, AsRefStr)]
+pub enum TableTypeValue {
+ #[strum(serialize = "COPY_ON_WRITE")]
+ CopyOnWrite,
+ #[strum(serialize = "MERGE_ON_READ")]
+ MergeOnRead,
+}
+
+impl FromStr for TableTypeValue {
+ type Err = anyhow::Error;
+
+ fn from_str(s: &str) -> Result<Self> {
+ match s.to_ascii_lowercase().as_str() {
+ "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
+ "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
+ _ => Err(anyhow!("Unsupported table type: {}", s)),
+ }
+ }
+}
+
+#[derive(Clone, Debug, PartialEq, AsRefStr)]
+pub enum BaseFileFormatValue {
+ #[strum(serialize = "parquet")]
+ Parquet,
+}
+
+impl FromStr for BaseFileFormatValue {
+ type Err = anyhow::Error;
+
+ fn from_str(s: &str) -> Result<Self> {
+ match s.to_ascii_lowercase().as_str() {
+ "parquet" => Ok(Self::Parquet),
+ _ => Err(anyhow!("Unsupported base file format: {}", s)),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::str::FromStr;
+
+ use crate::config::table::{BaseFileFormatValue, TableTypeValue};
+
+ #[test]
+ fn create_table_type() {
+ assert_eq!(
+ TableTypeValue::from_str("cow").unwrap(),
+ TableTypeValue::CopyOnWrite
+ );
+ assert_eq!(
+ TableTypeValue::from_str("copy_on_write").unwrap(),
+ TableTypeValue::CopyOnWrite
+ );
+ assert_eq!(
+ TableTypeValue::from_str("COPY-ON-WRITE").unwrap(),
+ TableTypeValue::CopyOnWrite
+ );
+ assert_eq!(
+ TableTypeValue::from_str("MOR").unwrap(),
+ TableTypeValue::MergeOnRead
+ );
+ assert_eq!(
+ TableTypeValue::from_str("Merge_on_read").unwrap(),
+ TableTypeValue::MergeOnRead
+ );
+ assert_eq!(
+ TableTypeValue::from_str("Merge-on-read").unwrap(),
+ TableTypeValue::MergeOnRead
+ );
+ assert!(TableTypeValue::from_str("").is_err());
+ assert!(TableTypeValue::from_str("copyonwrite").is_err());
+ assert!(TableTypeValue::from_str("MERGEONREAD").is_err());
+ assert!(TableTypeValue::from_str("foo").is_err());
+ }
+
+ #[test]
+ fn create_base_file_format() {
+ assert_eq!(
+ BaseFileFormatValue::from_str("parquet").unwrap(),
+ BaseFileFormatValue::Parquet
+ );
+ assert_eq!(
+ BaseFileFormatValue::from_str("PArquet").unwrap(),
+ BaseFileFormatValue::Parquet
+ );
+ assert!(TableTypeValue::from_str("").is_err());
+ assert!(
+ TableTypeValue::from_str("orc").is_err(),
+ "orc is not yet supported."
+ );
+ }
+}
diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs
deleted file mode 100644
index 6a1a6ce..0000000
--- a/crates/core/src/table/config.rs
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use anyhow::anyhow;
-use anyhow::Result;
-use std::str::FromStr;
-
-pub enum ConfigKey {
- BaseFileFormat,
- Checksum,
- DatabaseName,
- DropsPartitionFields,
- IsHiveStylePartitioning,
- IsPartitionPathUrlencoded,
- KeyGeneratorClass,
- PartitionFields,
- PrecombineField,
- PopulatesMetaFields,
- RecordKeyFields,
- TableName,
- TableType,
- TableVersion,
- TimelineLayoutVersion,
-}
-
-impl AsRef<str> for ConfigKey {
- fn as_ref(&self) -> &str {
- match self {
- Self::BaseFileFormat => "hoodie.table.base.file.format",
- Self::Checksum => "hoodie.table.checksum",
- Self::DatabaseName => "hoodie.database.name",
- Self::DropsPartitionFields =>
"hoodie.datasource.write.drop.partition.columns",
- Self::IsHiveStylePartitioning =>
"hoodie.datasource.write.hive_style_partitioning",
- Self::IsPartitionPathUrlencoded =>
"hoodie.datasource.write.partitionpath.urlencode",
- Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
- Self::PartitionFields => "hoodie.table.partition.fields",
- Self::PrecombineField => "hoodie.table.precombine.field",
- Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
- Self::RecordKeyFields => "hoodie.table.recordkey.fields",
- Self::TableName => "hoodie.table.name",
- Self::TableType => "hoodie.table.type",
- Self::TableVersion => "hoodie.table.version",
- Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
- }
- }
-}
-
-#[derive(Debug, PartialEq)]
-pub enum TableType {
- CopyOnWrite,
- MergeOnRead,
-}
-
-impl FromStr for TableType {
- type Err = anyhow::Error;
-
- fn from_str(s: &str) -> Result<Self> {
- match s.to_ascii_lowercase().as_str() {
- "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
- "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
- _ => Err(anyhow!("Unsupported table type: {}", s)),
- }
- }
-}
-
-#[derive(Debug, PartialEq)]
-pub enum BaseFileFormat {
- Parquet,
-}
-
-impl FromStr for BaseFileFormat {
- type Err = anyhow::Error;
-
- fn from_str(s: &str) -> Result<Self> {
- match s.to_ascii_lowercase().as_str() {
- "parquet" => Ok(Self::Parquet),
- _ => Err(anyhow!("Unsupported base file format: {}", s)),
- }
- }
-}
-#[cfg(test)]
-mod tests {
- use crate::table::config::{BaseFileFormat, TableType};
- use std::str::FromStr;
-
- #[test]
- fn create_table_type() {
- assert_eq!(TableType::from_str("cow").unwrap(),
TableType::CopyOnWrite);
- assert_eq!(
- TableType::from_str("copy_on_write").unwrap(),
- TableType::CopyOnWrite
- );
- assert_eq!(
- TableType::from_str("COPY-ON-WRITE").unwrap(),
- TableType::CopyOnWrite
- );
- assert_eq!(TableType::from_str("MOR").unwrap(),
TableType::MergeOnRead);
- assert_eq!(
- TableType::from_str("Merge_on_read").unwrap(),
- TableType::MergeOnRead
- );
- assert_eq!(
- TableType::from_str("Merge-on-read").unwrap(),
- TableType::MergeOnRead
- );
- assert!(TableType::from_str("").is_err());
- assert!(TableType::from_str("copyonwrite").is_err());
- assert!(TableType::from_str("MERGEONREAD").is_err());
- assert!(TableType::from_str("foo").is_err());
- }
-
- #[test]
- fn create_base_file_format() {
- assert_eq!(
- BaseFileFormat::from_str("parquet").unwrap(),
- BaseFileFormat::Parquet
- );
- assert_eq!(
- BaseFileFormat::from_str("PArquet").unwrap(),
- BaseFileFormat::Parquet
- );
- assert!(TableType::from_str("").is_err());
- assert!(
- TableType::from_str("orc").is_err(),
- "orc is not yet supported."
- );
- }
-}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index ad4f5f5..02d3ca4 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,6 +25,7 @@ use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use url::Url;
+use crate::config::HudiConfigs;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
@@ -32,7 +33,7 @@ use crate::storage::{get_leaf_dirs, Storage};
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct FileSystemView {
- props: Arc<HashMap<String, String>>,
+ configs: Arc<HudiConfigs>,
storage: Arc<Storage>,
partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
}
@@ -41,7 +42,7 @@ impl FileSystemView {
pub async fn new(
base_url: Arc<Url>,
storage_options: Arc<HashMap<String, String>>,
- props: Arc<HashMap<String, String>>,
+ configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, storage_options)?;
let partition_paths = Self::load_partition_paths(&storage).await?;
@@ -49,7 +50,7 @@ impl FileSystemView {
Self::load_file_groups_for_partitions(&storage,
partition_paths).await?;
let partition_to_file_groups =
Arc::new(DashMap::from_iter(partition_to_file_groups));
Ok(FileSystemView {
- props,
+ configs,
storage,
partition_to_file_groups,
})
@@ -168,6 +169,7 @@ mod tests {
use hudi_tests::TestTable;
+ use crate::config::HudiConfigs;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
@@ -208,7 +210,7 @@ mod tests {
let fs_view = FileSystemView::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
- Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
diff --git a/crates/core/src/table/metadata.rs
b/crates/core/src/table/metadata.rs
deleted file mode 100644
index fb2d41b..0000000
--- a/crates/core/src/table/metadata.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-use crate::table::config::{BaseFileFormat, TableType};
-
-pub trait ProvidesTableMetadata {
- fn base_file_format(&self) -> BaseFileFormat;
-
- fn checksum(&self) -> i64;
-
- fn database_name(&self) -> String;
-
- fn drops_partition_fields(&self) -> bool;
-
- fn is_hive_style_partitioning(&self) -> bool;
-
- fn is_partition_path_urlencoded(&self) -> bool;
-
- fn is_partitioned(&self) -> bool;
-
- fn key_generator_class(&self) -> String;
-
- fn location(&self) -> String;
-
- fn partition_fields(&self) -> Vec<String>;
-
- fn precombine_field(&self) -> String;
-
- fn populates_meta_fields(&self) -> bool;
-
- fn record_key_fields(&self) -> Vec<String>;
-
- fn table_name(&self) -> String;
-
- fn table_type(&self) -> TableType;
-
- fn table_version(&self) -> u32;
-
- fn timeline_layout_version(&self) -> u32;
-}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 849f0e0..e09fd10 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -19,7 +19,6 @@
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
-use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
@@ -27,25 +26,21 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use url::Url;
+use crate::config::HudiConfigs;
use crate::file_group::FileSlice;
use crate::storage::utils::parse_uri;
use crate::storage::Storage;
-use crate::table::config::BaseFileFormat;
-use crate::table::config::{ConfigKey, TableType};
use crate::table::fs_view::FileSystemView;
-use crate::table::metadata::ProvidesTableMetadata;
use crate::table::timeline::Timeline;
-mod config;
mod fs_view;
-mod metadata;
mod timeline;
-#[derive(Debug, Clone)]
+#[derive(Clone, Debug)]
pub struct Table {
pub base_url: Arc<Url>,
pub storage_options: Arc<HashMap<String, String>>,
- pub props: Arc<HashMap<String, String>>,
+ pub configs: Arc<HudiConfigs>,
pub timeline: Timeline,
pub file_system_view: FileSystemView,
}
@@ -55,24 +50,24 @@ impl Table {
let base_url = Arc::new(parse_uri(base_uri)?);
let storage_options = Arc::new(storage_options);
- let props = Self::load_properties(base_url.clone(),
storage_options.clone())
+ let configs = Self::load_properties(base_url.clone(),
storage_options.clone())
.await
.context("Failed to load table properties")?;
+ let configs = Arc::new(configs);
- let props = Arc::new(props);
- let timeline = Timeline::new(base_url.clone(),
storage_options.clone(), props.clone())
+ let timeline = Timeline::new(base_url.clone(),
storage_options.clone(), configs.clone())
.await
.context("Failed to load timeline")?;
let file_system_view =
- FileSystemView::new(base_url.clone(), storage_options.clone(),
props.clone())
+ FileSystemView::new(base_url.clone(), storage_options.clone(),
configs.clone())
.await
.context("Failed to load file system view")?;
Ok(Table {
base_url,
storage_options,
- props,
+ configs,
timeline,
file_system_view,
})
@@ -81,7 +76,7 @@ impl Table {
async fn load_properties(
base_url: Arc<Url>,
storage_options: Arc<HashMap<String, String>>,
- ) -> Result<HashMap<String, String>> {
+ ) -> Result<HudiConfigs> {
let storage = Storage::new(base_url, storage_options)?;
let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
let cursor = std::io::Cursor::new(data);
@@ -98,14 +93,7 @@ impl Table {
let value = parts.next().unwrap_or("").to_owned();
properties.insert(key, value);
}
- Ok(properties)
- }
-
- pub fn get_property(&self, key: &str) -> &str {
- match self.props.get(key) {
- Some(value) => value,
- None => panic!("Failed to retrieve property {}", key),
- }
+ Ok(HudiConfigs::new(properties))
}
pub async fn get_schema(&self) -> Result<Schema> {
@@ -178,101 +166,24 @@ impl Table {
}
}
-impl ProvidesTableMetadata for Table {
- fn base_file_format(&self) -> BaseFileFormat {
-
BaseFileFormat::from_str(self.get_property(ConfigKey::BaseFileFormat.as_ref())).unwrap()
- }
-
- fn checksum(&self) -> i64 {
- i64::from_str(self.get_property(ConfigKey::Checksum.as_ref())).unwrap()
- }
-
- fn database_name(&self) -> String {
- match self.props.get(ConfigKey::DatabaseName.as_ref()) {
- Some(value) => value.to_string(),
- None => "default".to_string(),
- }
- }
-
- fn drops_partition_fields(&self) -> bool {
-
bool::from_str(self.get_property(ConfigKey::DropsPartitionFields.as_ref())).unwrap()
- }
-
- fn is_hive_style_partitioning(&self) -> bool {
-
bool::from_str(self.get_property(ConfigKey::IsHiveStylePartitioning.as_ref())).unwrap()
- }
-
- fn is_partition_path_urlencoded(&self) -> bool {
-
bool::from_str(self.get_property(ConfigKey::IsPartitionPathUrlencoded.as_ref())).unwrap()
- }
-
- fn is_partitioned(&self) -> bool {
- !self
- .key_generator_class()
- .ends_with("NonpartitionedKeyGenerator")
- }
-
- fn key_generator_class(&self) -> String {
- self.get_property(ConfigKey::KeyGeneratorClass.as_ref())
- .to_string()
- }
-
- fn location(&self) -> String {
- self.base_url.path().to_string()
- }
-
- fn partition_fields(&self) -> Vec<String> {
- self.get_property(ConfigKey::PartitionFields.as_ref())
- .split(',')
- .map(str::to_string)
- .collect()
- }
-
- fn precombine_field(&self) -> String {
- self.get_property(ConfigKey::PrecombineField.as_ref())
- .to_string()
- }
-
- fn populates_meta_fields(&self) -> bool {
-
bool::from_str(self.get_property(ConfigKey::PopulatesMetaFields.as_ref())).unwrap()
- }
-
- fn record_key_fields(&self) -> Vec<String> {
- self.get_property(ConfigKey::RecordKeyFields.as_ref())
- .split(',')
- .map(str::to_string)
- .collect()
- }
-
- fn table_name(&self) -> String {
- self.get_property(ConfigKey::TableName.as_ref()).to_string()
- }
-
- fn table_type(&self) -> TableType {
-
TableType::from_str(self.get_property(ConfigKey::TableType.as_ref())).unwrap()
- }
-
- fn table_version(&self) -> u32 {
-
u32::from_str(self.get_property(ConfigKey::TableVersion.as_ref())).unwrap()
- }
-
- fn timeline_layout_version(&self) -> u32 {
-
u32::from_str(self.get_property(ConfigKey::TimelineLayoutVersion.as_ref())).unwrap()
- }
-}
-
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::fs::canonicalize;
+ use std::panic;
use std::path::Path;
- use hudi_tests::TestTable;
+ use url::Url;
+ use hudi_tests::{assert_not, TestTable};
+
+ use crate::config::table::HudiTableConfig::{
+ BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields,
IsHiveStylePartitioning,
+ IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields,
PopulatesMetaFields,
+ PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
+ TimelineLayoutVersion,
+ };
use crate::storage::utils::join_url_segments;
- use crate::table::config::BaseFileFormat::Parquet;
- use crate::table::config::TableType::CopyOnWrite;
- use crate::table::metadata::ProvidesTableMetadata;
use crate::table::Table;
#[tokio::test]
@@ -411,33 +322,146 @@ mod tests {
}
#[tokio::test]
- async fn hudi_table_get_table_metadata() {
- let base_path = canonicalize(Path::new(
- "tests/data/table_metadata/sample_table_properties",
- ))
- .unwrap();
- let table = Table::new(base_path.to_str().unwrap(), HashMap::new())
- .await
- .unwrap();
- assert_eq!(table.base_file_format(), Parquet);
- assert_eq!(table.checksum(), 3761586722);
- assert_eq!(table.database_name(), "default");
- assert!(!table.drops_partition_fields());
- assert!(!table.is_hive_style_partitioning());
- assert!(!table.is_partition_path_urlencoded());
- assert!(table.is_partitioned());
+ async fn validate_invalid_table_props() {
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
+ .unwrap();
+ let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let configs = table.configs;
+ assert!(
+ configs.validate(BaseFileFormat).is_err(),
+ "required config is missing"
+ );
+ assert!(configs.validate(Checksum).is_err());
+ assert!(
+ configs.validate(DatabaseName).is_ok(),
+ "non-required config is missing"
+ );
+ assert!(configs.validate(DropsPartitionFields).is_err());
+ assert!(configs.validate(IsHiveStylePartitioning).is_err());
+ assert!(configs.validate(IsPartitionPathUrlencoded).is_err());
+ assert!(
+ configs.validate(KeyGeneratorClass).is_ok(),
+ "non-required config is missing"
+ );
+ assert!(
+ configs.validate(PartitionFields).is_ok(),
+ "non-required config is missing"
+ );
+ assert!(
+ configs.validate(PrecombineField).is_ok(),
+ "non-required config is missing"
+ );
+ assert!(
+ configs.validate(PopulatesMetaFields).is_ok(),
+ "non-required config is missing"
+ );
+ assert!(
+ configs.validate(RecordKeyFields).is_ok(),
+ "non-required config is missing"
+ );
+ assert!(
+ configs.validate(TableName).is_err(),
+ "required config is missing"
+ );
+ assert!(
+ configs.validate(TableType).is_ok(),
+ "Valid table type value"
+ );
+ assert!(configs.validate(TableVersion).is_err());
+ assert!(configs.validate(TimelineLayoutVersion).is_err());
+ }
+
+ #[tokio::test]
+ async fn get_invalid_table_props() {
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
+ .unwrap();
+ let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let configs = table.configs;
+ assert!(configs.get(BaseFileFormat).is_err());
+ assert!(configs.get(Checksum).is_err());
+ assert!(configs.get(DatabaseName).is_err());
+ assert!(configs.get(DropsPartitionFields).is_err());
+ assert!(configs.get(IsHiveStylePartitioning).is_err());
+ assert!(configs.get(IsPartitionPathUrlencoded).is_err());
+ assert!(configs.get(KeyGeneratorClass).is_err());
+ assert!(configs.get(PartitionFields).is_err());
+ assert!(configs.get(PrecombineField).is_err());
+ assert!(configs.get(PopulatesMetaFields).is_err());
+ assert!(configs.get(RecordKeyFields).is_err());
+ assert!(configs.get(TableName).is_err());
+ assert!(configs.get(TableType).is_ok(), "Valid table type value");
+ assert!(configs.get(TableVersion).is_err());
+ assert!(configs.get(TimelineLayoutVersion).is_err());
+ }
+
+ #[tokio::test]
+ async fn get_default_for_invalid_table_props() {
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
+ .unwrap();
+ let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let configs = table.configs;
+ assert!(panic::catch_unwind(||
configs.get_or_default(BaseFileFormat)).is_err());
+ assert!(panic::catch_unwind(||
configs.get_or_default(Checksum)).is_err());
+ assert_eq!(
+ configs.get_or_default(DatabaseName).to::<String>(),
+ "default"
+ );
+ assert_not!(configs.get_or_default(DropsPartitionFields).to::<bool>());
+ assert!(panic::catch_unwind(||
configs.get_or_default(IsHiveStylePartitioning)).is_err());
+ assert!(panic::catch_unwind(||
configs.get_or_default(IsPartitionPathUrlencoded)).is_err());
+ assert!(panic::catch_unwind(||
configs.get_or_default(KeyGeneratorClass)).is_err());
+ assert!(panic::catch_unwind(||
configs.get_or_default(PartitionFields)).is_err());
+ assert!(panic::catch_unwind(||
configs.get_or_default(PrecombineField)).is_err());
+ assert!(configs.get_or_default(PopulatesMetaFields).to::<bool>());
+ assert!(panic::catch_unwind(||
configs.get_or_default(RecordKeyFields)).is_err());
+ assert!(panic::catch_unwind(||
configs.get_or_default(TableName)).is_err());
+ assert_eq!(
+ configs.get_or_default(TableType).to::<String>(),
+ "COPY_ON_WRITE"
+ );
+ assert!(panic::catch_unwind(||
configs.get_or_default(TableVersion)).is_err());
+ assert!(panic::catch_unwind(||
configs.get_or_default(TimelineLayoutVersion)).is_err());
+ }
+
+ #[tokio::test]
+ async fn get_valid_table_props() {
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
+ .unwrap();
+ let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let configs = table.configs;
+ assert_eq!(
+ configs.get(BaseFileFormat).unwrap().to::<String>(),
+ "parquet"
+ );
+ assert_eq!(configs.get(Checksum).unwrap().to::<isize>(), 3761586722);
+ assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "db");
+ assert!(!configs.get(DropsPartitionFields).unwrap().to::<bool>());
+ assert!(!configs.get(IsHiveStylePartitioning).unwrap().to::<bool>());
+ assert!(!configs.get(IsPartitionPathUrlencoded).unwrap().to::<bool>());
assert_eq!(
- table.key_generator_class(),
+ configs.get(KeyGeneratorClass).unwrap().to::<String>(),
"org.apache.hudi.keygen.SimpleKeyGenerator"
);
- assert_eq!(table.location(), base_path.to_str().unwrap());
- assert_eq!(table.partition_fields(), vec!["city"]);
- assert_eq!(table.precombine_field(), "ts");
- assert!(table.populates_meta_fields());
- assert_eq!(table.record_key_fields(), vec!["uuid"]);
- assert_eq!(table.table_name(), "trips");
- assert_eq!(table.table_type(), CopyOnWrite);
- assert_eq!(table.table_version(), 6);
- assert_eq!(table.timeline_layout_version(), 1);
+ assert_eq!(
+ configs.get(PartitionFields).unwrap().to::<Vec<String>>(),
+ vec!["city"]
+ );
+ assert_eq!(configs.get(PrecombineField).unwrap().to::<String>(), "ts");
+ assert!(configs.get(PopulatesMetaFields).unwrap().to::<bool>());
+ assert_eq!(
+ configs.get(RecordKeyFields).unwrap().to::<Vec<String>>(),
+ vec!["uuid"]
+ );
+ assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+ assert_eq!(
+ configs.get(TableType).unwrap().to::<String>(),
+ "COPY_ON_WRITE"
+ );
+ assert_eq!(configs.get(TableVersion).unwrap().to::<isize>(), 6);
+ assert_eq!(configs.get(TimelineLayoutVersion).unwrap().to::<isize>(),
1);
}
}
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index 51601a4..d2d476d 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -29,6 +29,7 @@ use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
use url::Url;
+use crate::config::HudiConfigs;
use crate::storage::utils::split_filename;
use crate::storage::Storage;
@@ -76,7 +77,7 @@ impl Instant {
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct Timeline {
- props: Arc<HashMap<String, String>>,
+ configs: Arc<HudiConfigs>,
storage: Arc<Storage>,
pub instants: Vec<Instant>,
}
@@ -85,13 +86,13 @@ impl Timeline {
pub async fn new(
base_url: Arc<Url>,
storage_options: Arc<HashMap<String, String>>,
- props: Arc<HashMap<String, String>>,
+ configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, storage_options)?;
let instants = Self::load_completed_commit_instants(&storage).await?;
Ok(Self {
storage,
- props,
+ configs,
instants,
})
}
@@ -179,6 +180,7 @@ mod tests {
use hudi_tests::TestTable;
+ use crate::config::HudiConfigs;
use crate::table::timeline::{Instant, State, Timeline};
#[tokio::test]
@@ -187,7 +189,7 @@ mod tests {
let timeline = Timeline::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
- Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
@@ -201,7 +203,7 @@ mod tests {
let timeline = Timeline::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
- Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
@@ -222,7 +224,7 @@ mod tests {
let timeline = Timeline::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
- Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
diff --git
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
b/crates/core/tests/data/table_props_invalid/.hoodie/hoodie.properties
similarity index 51%
copy from
crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
copy to crates/core/tests/data/table_props_invalid/.hoodie/hoodie.properties
index 9c0c7e2..242d34f 100644
---
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
+++ b/crates/core/tests/data/table_props_invalid/.hoodie/hoodie.properties
@@ -17,22 +17,11 @@
# under the License.
#
-hoodie.table.type=COPY_ON_WRITE
-hoodie.table.metadata.partitions=files
-hoodie.table.precombine.field=ts
-hoodie.table.partition.fields=city
-hoodie.archivelog.folder=archived
-hoodie.table.cdc.enabled=false
-hoodie.timeline.layout.version=1
-hoodie.table.checksum=3761586722
-hoodie.datasource.write.drop.partition.columns=false
-hoodie.table.recordkey.fields=uuid
-hoodie.table.name=trips
-hoodie.partition.metafile.use.base.format=false
-hoodie.datasource.write.hive_style_partitioning=false
-hoodie.table.metadata.partitions.inflight=
-hoodie.populate.meta.fields=true
-hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
-hoodie.table.base.file.format=PARQUET
-hoodie.datasource.write.partitionpath.urlencode=false
-hoodie.table.version=6
+hoodie.table.type=cow
+hoodie.table.base.file.format=foo
+hoodie.table.checksum=bar
+hoodie.table.version=baz
+hoodie.timeline.layout.version=_
+hoodie.datasource.write.drop.partition.columns=1
+hoodie.datasource.write.hive_style_partitioning=0
+hoodie.datasource.write.partitionpath.urlencode=-1
diff --git
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
b/crates/core/tests/data/table_props_valid/.hoodie/hoodie.properties
similarity index 96%
rename from
crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
rename to crates/core/tests/data/table_props_valid/.hoodie/hoodie.properties
index 9c0c7e2..d1e5ac6 100644
---
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
+++ b/crates/core/tests/data/table_props_valid/.hoodie/hoodie.properties
@@ -17,7 +17,7 @@
# under the License.
#
-hoodie.table.type=COPY_ON_WRITE
+hoodie.table.type=copy_on_write
hoodie.table.metadata.partitions=files
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=city
@@ -28,6 +28,7 @@ hoodie.table.checksum=3761586722
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.recordkey.fields=uuid
hoodie.table.name=trips
+hoodie.database.name=db
hoodie.partition.metafile.use.base.format=false
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.metadata.partitions.inflight=
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 3b0a73a..1435d94 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -38,8 +38,8 @@ use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::create_physical_expr;
use DataFusionError::Execution;
-use hudi_core::config::HudiConfig::ReadInputPartitions;
-use hudi_core::config::OptionsParser;
+use hudi_core::config::read::HudiReadConfig::InputPartitions;
+use hudi_core::config::ConfigParser;
use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
use hudi_core::HudiTable;
@@ -51,9 +51,9 @@ pub struct HudiDataSource {
impl HudiDataSource {
pub async fn new(base_uri: &str, options: HashMap<String, String>) ->
Result<Self> {
- let input_partitions = ReadInputPartitions
+ let input_partitions = InputPartitions
.parse_value_or_default(&options)
- .cast::<usize>();
+ .to::<usize>();
match HudiTable::new(base_uri, options).await {
Ok(t) => Ok(Self {
table: Arc::new(t),
@@ -136,15 +136,13 @@ mod tests {
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::ScalarValue;
- use hudi_core::config::HudiConfig::ReadInputPartitions;
- use hudi_tests::utils::get_bool_column;
+ use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_tests::TestTable::{
- V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle,
- V6TimebasedkeygenNonhivestyle,
+ V6ComplexkeygenHivestyle, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
+ V6SimplekeygenNonhivestyle, V6TimebasedkeygenNonhivestyle,
};
use hudi_tests::{utils, TestTable};
- use utils::{get_i32_column, get_str_column};
- use TestTable::V6ComplexkeygenHivestyle;
+ use utils::{get_bool_column, get_i32_column, get_str_column};
use crate::HudiDataSource;
@@ -215,7 +213,7 @@ mod tests {
println!(">>> testing for {}", test_table.as_ref());
let ctx = prepare_session_context(
test_table,
- &[(ReadInputPartitions.as_ref().to_string(), "2".to_string())],
+ &[(InputPartitions.as_ref().to_string(), "2".to_string())],
)
.await;
diff --git a/crates/tests/src/utils.rs b/crates/tests/src/utils.rs
index 0f7d44b..73fafd9 100644
--- a/crates/tests/src/utils.rs
+++ b/crates/tests/src/utils.rs
@@ -20,6 +20,20 @@
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, BooleanArray, Int32Array, StringArray};
+#[macro_export]
+macro_rules! assert_not {
+ ($cond:expr) => {
+ if $cond {
+ panic!("assertion failed: condition is true");
+ }
+ };
+ ($cond:expr, $($arg:tt)+) => {
+ if $cond {
+ panic!("assertion failed: condition is true: {}",
format_args!($($arg)+));
+ }
+ };
+}
+
#[macro_export]
macro_rules! assert_approx_eq {
($a:expr, $b:expr, $delta:expr) => {{