xushiyan commented on code in PR #612:
URL: https://github.com/apache/hudi-rs/pull/612#discussion_r3246365464
##########
crates/core/src/merge/mod.rs:
##########
@@ -16,15 +16,98 @@
* specific language governing permissions and limitations
* under the License.
*/
+#[cfg(test)]
+mod conformance_tests;
mod ordering;
-pub mod record_merger;
+pub mod record_batch_merger;
+use crate::Result;
+use crate::config::HudiConfigs;
use crate::config::error;
use crate::config::error::ConfigError;
use crate::config::error::ConfigError::InvalidValue;
+use crate::config::error::Result as ConfigResult;
+use crate::config::read::HudiReadConfig;
+use crate::config::table::HudiTableConfig::{
+ OrderingFields, PopulatesMetaFields, RecordMergeStrategy,
+};
+use crate::error::CoreError;
+use crate::file_group::record_batches::RecordBatches;
+use crate::merge::record_batch_merger::RecordBatchMerger;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
use std::str::FromStr;
+use std::sync::Arc;
use strum_macros::AsRefStr;
+/// Merges materialized data and delete batches into a single output batch.
+///
+/// Implementations are configured when constructed. Callers provide the union
of
+/// base-file and log-file data batches plus delete batches; the merger
returns a
+/// batch using [`Self::output_schema`]. Errors are returned for invalid
+/// merge configuration, missing required fields, or Arrow compute failures.
+pub trait RecordMerger: Send + Sync + std::fmt::Debug {
+ /// Merges the provided record batches into one output batch.
+ ///
+ /// The input must contain batches compatible with this merger's output
+ /// schema and configured Hudi merge strategy. Depending on that strategy,
+ /// implementations may apply deletes or deduplicate by record key using
+ /// ordering values. The returned batch uses [`Self::output_schema`].
+ fn merge(&self, inputs: RecordBatches) -> Result<RecordBatch>;
+
+ /// Returns the schema used by batches produced by this merger.
+ fn output_schema(&self) -> &SchemaRef;
+}
+
+/// Default record merger implementation name.
+pub const DEFAULT_RECORD_MERGER_IMPL: &str = "record_batch";
+
+/// Creates a record merger from the configured merger implementation name.
+pub fn create_record_merger(
+ schema: SchemaRef,
+ hudi_configs: Arc<HudiConfigs>,
+) -> Result<Arc<dyn RecordMerger>> {
+ let impl_name: String = hudi_configs
+ .get_or_default(HudiReadConfig::RecordMergerImpl)
+ .into();
+ let normalized_impl_name = impl_name.to_ascii_lowercase();
+ match normalized_impl_name.as_str() {
+ DEFAULT_RECORD_MERGER_IMPL =>
Ok(Arc::new(RecordBatchMerger::new(schema, hudi_configs))),
+ _ => Err(CoreError::Config(ConfigError::InvalidValue(format!(
+ "unknown record_merger_impl: {impl_name}"
+ )))),
+ }
+}
+
+/// Validates merge-related Hudi configs shared by all record merger
implementations.
+pub fn validate_configs(hudi_configs: &HudiConfigs) -> ConfigResult<()> {
+ let merge_strategy: String =
hudi_configs.get_or_default(RecordMergeStrategy).into();
+ let merge_strategy = RecordMergeStrategyValue::from_str(&merge_strategy)?;
+
+ let populate_meta_fields: bool =
hudi_configs.get_or_default(PopulatesMetaFields).into();
+ if !populate_meta_fields && merge_strategy !=
RecordMergeStrategyValue::AppendOnly {
+ return Err(ConfigError::InvalidValue(format!(
+ "When {:?} is false, {:?} must be {:?}.",
+ PopulatesMetaFields,
+ RecordMergeStrategy,
+ RecordMergeStrategyValue::AppendOnly
+ )));
+ }
+
+ let precombine_field = hudi_configs.try_get(OrderingFields)?;
+ if precombine_field.is_none() && merge_strategy ==
RecordMergeStrategyValue::OverwriteWithLatest
Review Comment:
@yihua currently ordering field must present for merging when using
overwrite with latest strategy.
But i can simply change it to allow absent and then fallback to use commit
time field. Then update the strategy name to event time and commit time
ordering (both implicitly doing overwrite with latest). This will get these 2
ordering supported.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]