This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new f722508 fix: derive record merge strategy based on table configs
(#260)
f722508 is described below
commit f72250808fbd45797fc2c6e81014f382533ba554
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jan 22 18:51:22 2025 -0600
fix: derive record merge strategy based on table configs (#260)
- Derive `hoodie.table.record.merge.strategy` based on
`hoodie.populate.meta.fields` and `hoodie.table.precombine.field`
- Remove hard-coded default value
- Add test cases for table with populate meta fields = false
---
crates/core/src/config/table.rs | 74 +++++++++++++++++-
crates/core/src/merge/mod.rs | 6 --
crates/core/src/merge/record_merger.rs | 11 ---
crates/core/src/table/builder.rs | 3 +
crates/core/src/table/mod.rs | 24 +++++-
.../data/tables/mor/v6_complexkeygen_hivestyle.sql | 1 -
.../v6_simplekeygen_hivestyle_no_metafields.sql | 85 +++++++++++++++++++++
.../v6_simplekeygen_hivestyle_no_metafields.zip | Bin 0 -> 28077 bytes
.../tables/mor/v6_simplekeygen_nonhivestyle.sql | 1 -
...v6_simplekeygen_nonhivestyle_overwritetable.sql | 1 -
10 files changed, 182 insertions(+), 24 deletions(-)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 9c73b76..1960542 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -159,9 +159,6 @@ impl ConfigParser for HudiTableConfig {
Self::DropsPartitionFields =>
Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
- Self::RecordMergeStrategy => Some(HudiConfigValue::String(
- RecordMergeStrategyValue::default().as_ref().to_string(),
- )),
Self::TimelineTimezone => Some(HudiConfigValue::String(
TimelineTimezoneValue::UTC.as_ref().to_string(),
)),
@@ -238,6 +235,41 @@ impl ConfigParser for HudiTableConfig {
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
}
}
+
+ fn parse_value_or_default(&self, configs: &HashMap<String, String>) ->
Self::Output {
+ self.parse_value(configs).unwrap_or_else(|_| {
+ match self {
+ Self::RecordMergeStrategy => {
+ let populates_meta_fields =
HudiTableConfig::PopulatesMetaFields
+ .parse_value_or_default(configs)
+ .to::<bool>();
+ if !populates_meta_fields {
+ // When populatesMetaFields is false, meta fields such
as record key and
+ // partition path are null, the table is supposed to
be append-only.
+ return HudiConfigValue::String(
+
RecordMergeStrategyValue::AppendOnly.as_ref().to_string(),
+ );
+ }
+
+ if
!configs.contains_key(HudiTableConfig::PrecombineField.as_ref()) {
+ // When precombine field is not available, we treat
the table as append-only
+ return HudiConfigValue::String(
+
RecordMergeStrategyValue::AppendOnly.as_ref().to_string(),
+ );
+ }
+
+ return HudiConfigValue::String(
+ RecordMergeStrategyValue::OverwriteWithLatest
+ .as_ref()
+ .to_string(),
+ );
+ }
+ _ => self
+ .default_value()
+ .unwrap_or_else(|| panic!("No default value for config
'{}'", self.as_ref())),
+ }
+ })
+ }
}
/// Config value for [HudiTableConfig::TableType].
@@ -310,6 +342,7 @@ impl FromStr for TimelineTimezoneValue {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::config::HudiConfigs;
#[test]
fn create_table_type() {
@@ -438,4 +471,39 @@ mod tests {
"hoodie.table.name"
);
}
+
+ #[test]
+ fn test_derive_record_merger_strategy() {
+ let hudi_configs = HudiConfigs::new(vec![
+ (HudiTableConfig::PopulatesMetaFields, "false"),
+ (HudiTableConfig::PrecombineField, "ts"),
+ ]);
+ assert_eq!(
+ hudi_configs
+ .get_or_default(HudiTableConfig::RecordMergeStrategy)
+ .to::<String>(),
+ RecordMergeStrategyValue::AppendOnly.as_ref(),
+ "Should derive as append-only due to populatesMetaFields=false"
+ );
+
+ let hudi_configs =
HudiConfigs::new(vec![(HudiTableConfig::PopulatesMetaFields, "true")]);
+ assert_eq!(
+ hudi_configs
+ .get_or_default(HudiTableConfig::RecordMergeStrategy)
+ .to::<String>(),
+ RecordMergeStrategyValue::AppendOnly.as_ref(),
+ "Should derive as append-only due to missing precombine field"
+ );
+
+ let hudi_configs = HudiConfigs::new(vec![
+ (HudiTableConfig::PopulatesMetaFields, "true"),
+ (HudiTableConfig::PrecombineField, "ts"),
+ ]);
+ assert_eq!(
+ hudi_configs
+ .get_or_default(HudiTableConfig::RecordMergeStrategy)
+ .to::<String>(),
+ RecordMergeStrategyValue::OverwriteWithLatest.as_ref(),
+ );
+ }
}
diff --git a/crates/core/src/merge/mod.rs b/crates/core/src/merge/mod.rs
index 1d9eed3..bcf2257 100644
--- a/crates/core/src/merge/mod.rs
+++ b/crates/core/src/merge/mod.rs
@@ -33,12 +33,6 @@ pub enum RecordMergeStrategyValue {
OverwriteWithLatest,
}
-impl Default for RecordMergeStrategyValue {
- fn default() -> Self {
- Self::OverwriteWithLatest
- }
-}
-
impl FromStr for RecordMergeStrategyValue {
type Err = ConfigError;
diff --git a/crates/core/src/merge/record_merger.rs
b/crates/core/src/merge/record_merger.rs
index 3a1b6ac..74fc568 100644
--- a/crates/core/src/merge/record_merger.rs
+++ b/crates/core/src/merge/record_merger.rs
@@ -42,15 +42,6 @@ pub struct RecordMerger {
impl RecordMerger {
/// Validates the given [HudiConfigs] against the [RecordMergeStrategy].
- ///
- /// # Notes
- /// This should be ideally called during table creation. However, an empty
- /// table could have no precombine field being set, and we also want to
keep
- /// the default merge strategy as [OverwriteWithLatest] to fulfill the
- /// snapshot read semantics out-of-the-box. This would conflict with
- /// having no precombine field.
- ///
- /// TODO: We should derive merge strategy dynamically if not set by user.
pub fn validate_configs(hudi_configs: &HudiConfigs) -> ConfigResult<()> {
let merge_strategy = hudi_configs
.get_or_default(RecordMergeStrategy)
@@ -93,8 +84,6 @@ impl RecordMerger {
schema: &SchemaRef,
batches: &[RecordBatch],
) -> Result<RecordBatch> {
- Self::validate_configs(&self.hudi_configs)?;
-
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index 853ef1b..35e805e 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -34,6 +34,7 @@ use crate::config::table::HudiTableConfig::{
use crate::config::util::{parse_data_for_options,
split_hudi_options_from_others};
use crate::config::{HudiConfigs, HUDI_CONF_DIR};
use crate::error::CoreError;
+use crate::merge::record_merger::RecordMerger;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::Table;
@@ -276,6 +277,8 @@ impl TableBuilder {
)));
}
+ RecordMerger::validate_configs(hudi_configs)?;
+
Ok(())
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 07c173e..6488573 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -1015,7 +1015,7 @@ mod tests {
#[tokio::test]
async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
- for base_url in
&[SampleTable::V6SimplekeygenNonhivestyle.url_to_mor()] {
+ for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let commit_timestamps = hudi_table
.timeline
@@ -1038,6 +1038,28 @@ mod tests {
}
Ok(())
}
+
+ #[tokio::test]
+ async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
+ for base_url in
SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(&[]).await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![
+ (1, "Alice", false),
+ (2, "Bob", false),
+ (3, "Carol", true),
+ (4, "Diana", true),
+ ]
+ )
+ }
+ Ok(())
+ }
}
mod test_incremental_queries {
diff --git a/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
index 64f2efd..a8883bb 100644
--- a/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
+++ b/crates/tests/data/tables/mor/v6_complexkeygen_hivestyle.sql
@@ -43,7 +43,6 @@ CREATE TABLE v6_complexkeygen_hivestyle (
shortField SHORT
)
USING HUDI
- LOCATION '/opt/data/external_tables/v6_complexkeygen_hivestyle'
TBLPROPERTIES (
type = 'mor',
primaryKey = 'id,name',
diff --git
a/crates/tests/data/tables/mor/v6_simplekeygen_hivestyle_no_metafields.sql
b/crates/tests/data/tables/mor/v6_simplekeygen_hivestyle_no_metafields.sql
new file mode 100644
index 0000000..09cdcd3
--- /dev/null
+++ b/crates/tests/data/tables/mor/v6_simplekeygen_hivestyle_no_metafields.sql
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_simplekeygen_hivestyle_no_metafields (
+ id INT,
+ name STRING,
+ isActive BOOLEAN,
+ shortField SHORT,
+ intField INT,
+ longField LONG,
+ floatField FLOAT,
+ doubleField DOUBLE,
+ decimalField DECIMAL(10,5),
+ dateField DATE,
+ timestampField TIMESTAMP,
+ binaryField BINARY,
+ arrayField ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>, --
Array of structs
+ mapField MAP<STRING, STRUCT<map_field_value_struct_f1: DOUBLE,
map_field_value_struct_f2: BOOLEAN>>, -- Map with struct values
+ structField STRUCT<
+ field1: STRING,
+ field2: INT,
+ child_struct: STRUCT<
+ child_field1: DOUBLE,
+ child_field2: BOOLEAN
+ >
+ >,
+ byteField BYTE
+)
+USING HUDI
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ preCombineField = 'longField',
+ 'hoodie.metadata.enable' = 'false',
+ 'hoodie.datasource.write.hive_style_partitioning' = 'true',
+ 'hoodie.datasource.write.drop.partition.columns' = 'false',
+ 'hoodie.populate.meta.fields' = 'false',
+ 'hoodie.table.log.file.format' = 'PARQUET',
+ 'hoodie.logfile.data.block.format' = 'parquet',
+ 'hoodie.datasource.write.record.merger.impls' =
'org.apache.hudi.HoodieSparkRecordMerger',
+ 'hoodie.parquet.small.file.limit' = '0'
+)
+PARTITIONED BY (byteField);
+
+INSERT INTO v6_simplekeygen_hivestyle_no_metafields VALUES
+(1, 'Alice', false, 300, 15000, 1234567890, 1.0, 3.14159, 12345.67890,
CAST('2023-04-01' AS DATE), CAST('2023-04-01 12:01:00' AS TIMESTAMP),
CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100), STRUCT('blue', 200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456, true), 'key2', STRUCT(789.012, false)),
+ STRUCT('Alice', 30, STRUCT(123.456, true)),
+ 10
+),
+(2, 'Bob', false, 100, 25000, 9876543210, 2.0, 2.71828, 67890.12345,
CAST('2023-04-02' AS DATE), CAST('2023-04-02 13:02:00' AS TIMESTAMP),
CAST('more binary data' AS BINARY),
+ ARRAY(STRUCT('yellow', 400), STRUCT('purple', 500)),
+ MAP('key3', STRUCT(234.567, true), 'key4', STRUCT(567.890, false)),
+ STRUCT('Bob', 40, STRUCT(789.012, false)),
+ 20
+),
+(3, 'Carol', true, 200, 35000, 1928374650, 3.0, 1.41421, 11111.22222,
CAST('2023-04-03' AS DATE), CAST('2023-04-03 14:03:00' AS TIMESTAMP),
CAST('even more binary data' AS BINARY),
+ ARRAY(STRUCT('black', 600), STRUCT('white', 700), STRUCT('pink', 800)),
+ MAP('key5', STRUCT(345.678, true), 'key6', STRUCT(654.321, false)),
+ STRUCT('Carol', 25, STRUCT(456.789, true)),
+ 10
+),
+(4, 'Diana', true, 500, 45000, 987654321, 4.0, 2.468, 65432.12345,
CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new
binary data' AS BINARY),
+ ARRAY(STRUCT('orange', 900), STRUCT('gray', 1000)),
+ MAP('key7', STRUCT(456.789, true), 'key8', STRUCT(123.456, false)),
+ STRUCT('Diana', 50, STRUCT(987.654, true)),
+ 30
+);
diff --git
a/crates/tests/data/tables/mor/v6_simplekeygen_hivestyle_no_metafields.zip
b/crates/tests/data/tables/mor/v6_simplekeygen_hivestyle_no_metafields.zip
new file mode 100644
index 0000000..cd89070
Binary files /dev/null and
b/crates/tests/data/tables/mor/v6_simplekeygen_hivestyle_no_metafields.zip
differ
diff --git a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
index 1c7c3bc..3645002 100644
--- a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
+++ b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle.sql
@@ -43,7 +43,6 @@ CREATE TABLE v6_simplekeygen_nonhivestyle (
byteField BYTE
)
USING HUDI
- LOCATION '/opt/data/external_tables/v6_simplekeygen_nonhivestyle'
TBLPROPERTIES (
type = 'mor',
primaryKey = 'id',
diff --git
a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
index d86e5df..e9cd839 100644
---
a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
+++
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
@@ -43,7 +43,6 @@ CREATE TABLE v6_simplekeygen_nonhivestyle_overwritetable (
byteField BYTE
)
USING HUDI
- LOCATION
'/opt/data/external_tables/v6_simplekeygen_nonhivestyle_overwritetable'
TBLPROPERTIES (
type = 'mor',
primaryKey = 'id',