TheR1sing3un commented on code in PR #340:
URL: https://github.com/apache/paimon-rust/pull/340#discussion_r3394134974


##########
crates/paimon/src/spec/aggregation.rs:
##########
@@ -0,0 +1,603 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use crate::spec::{DataField, DataType};
+
+const MERGE_ENGINE_OPTION: &str = "merge-engine";
+const AGGREGATION_ENGINE: &str = "aggregation";
+const IGNORE_DELETE_OPTION: &str = "ignore-delete";
+const IGNORE_DELETE_SUFFIX: &str = ".ignore-delete";
+const AGGREGATION_REMOVE_RECORD_ON_DELETE_OPTION: &str = 
"aggregation.remove-record-on-delete";
+const FIELDS_DEFAULT_AGG_FUNCTION_OPTION: &str = 
"fields.default-aggregate-function";
+const FIELDS_PREFIX: &str = "fields.";
+const AGG_FUNCTION_SUFFIX: &str = ".aggregate-function";
+const LIST_AGG_DELIMITER_SUFFIX: &str = ".list-agg-delimiter";
+const IGNORE_RETRACT_SUFFIX: &str = ".ignore-retract";
+const DISTINCT_SUFFIX: &str = ".distinct";
+const SEQUENCE_GROUP_SUFFIX: &str = ".sequence-group";
+const NESTED_KEY_SUFFIX: &str = ".nested-key";
+const COUNT_LIMIT_SUFFIX: &str = ".count-limit";
+
+/// Minimal aggregation mode recognized by the current Rust implementation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum AggregationMode {
+    Basic,
+}
+
+/// Aggregation-merge-engine option inspection and validation.
+///
+/// The basic mode accepts only `merge-engine=aggregation` on a PK table with
+/// the following option keys:
+/// - `fields.default-aggregate-function`
+/// - `fields.<col>.aggregate-function`
+/// - `fields.<col>.list-agg-delimiter`
+///
+/// All other aggregation-specific knobs (`ignore-retract`, `distinct`,
+/// `nested-key`, `count-limit`, `aggregation.remove-record-on-delete`,
+/// `sequence-group`, `ignore-delete`) are rejected.  Retract rows
+/// (DELETE / UPDATE_BEFORE) are rejected at runtime by the merge function.
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct AggregationConfig<'a> {
+    options: &'a HashMap<String, String>,
+}
+
+impl<'a> AggregationConfig<'a> {
+    pub(crate) fn new(options: &'a HashMap<String, String>) -> Self {
+        Self { options }
+    }
+
+    pub(crate) fn is_enabled(&self) -> bool {
+        self.options
+            .get(MERGE_ENGINE_OPTION)
+            .is_some_and(|value| 
value.eq_ignore_ascii_case(AGGREGATION_ENGINE))
+    }
+
+    /// Validate options at CREATE TABLE time, using the schema's fields to
+    /// reject typo'd column names, unknown aggregate functions, and
+    /// function/type pairs that the runtime would refuse.
+    ///
+    /// Java upstream rejects unknown columns and unknown function names in
+    /// `SchemaValidation.validateFieldsPrefix` + 
`validateMergeFunctionFactory`;
+    /// the function/type compatibility check is stricter than Java, which
+    /// defers it to `FieldAggregatorFactory#create` at runtime.  Catching all
+    /// three at CREATE TABLE keeps invalid metadata from being persisted.
+    pub(crate) fn validate_create_mode(
+        &self,
+        has_primary_keys: bool,
+        fields: &[DataField],
+    ) -> crate::Result<Option<AggregationMode>> {
+        let mode = match self.validated_mode(has_primary_keys) {
+            Ok(mode) => mode,
+            Err(unsupported_options) => {
+                return Err(crate::Error::ConfigInvalid {
+                    message: format!(
+                        "merge-engine=aggregation only supports the basic mode 
in this build; unsupported options: {}",
+                        unsupported_options.join(", ")
+                    ),
+                });
+            }
+        };
+        if mode.is_some() {
+            self.validate_field_scoped_options(fields)?;
+        }
+        Ok(mode)
+    }
+
+    /// Validate options at read/write runtime.
+    pub(crate) fn validate_runtime_mode(
+        &self,
+        has_primary_keys: bool,
+        table_name: &str,
+    ) -> crate::Result<Option<AggregationMode>> {
+        match self.validated_mode(has_primary_keys) {
+            Ok(mode) => Ok(mode),
+            Err(unsupported_options) => Err(crate::Error::Unsupported {
+                message: format!(
+                    "Table '{table_name}' uses merge-engine=aggregation 
options not supported by this build: {}",
+                    unsupported_options.join(", ")
+                ),
+            }),
+        }
+    }
+
+    fn validated_mode(
+        &self,
+        has_primary_keys: bool,
+    ) -> std::result::Result<Option<AggregationMode>, Vec<String>> {
+        if !has_primary_keys || !self.is_enabled() {
+            return Ok(None);
+        }
+
+        let unsupported_options = self.unsupported_option_keys();
+        if !unsupported_options.is_empty() {
+            return Err(unsupported_options);
+        }
+
+        Ok(Some(AggregationMode::Basic))
+    }
+
+    fn unsupported_option_keys(&self) -> Vec<String> {
+        let mut keys: Vec<String> = self
+            .options
+            .keys()
+            .filter(|key| is_unsupported_aggregation_option(key))
+            .cloned()
+            .collect();
+        keys.sort();
+        keys
+    }
+
+    /// Per-field aggregate function configured via 
`fields.<col>.aggregate-function`.
+    pub(crate) fn agg_function_for_field(&self, field_name: &str) -> 
Option<&str> {
+        let key = format!("{FIELDS_PREFIX}{field_name}{AGG_FUNCTION_SUFFIX}");
+        self.options.get(&key).map(String::as_str)
+    }
+
+    /// Default aggregate function from `fields.default-aggregate-function`.
+    pub(crate) fn default_agg_function(&self) -> Option<&str> {
+        self.options
+            .get(FIELDS_DEFAULT_AGG_FUNCTION_OPTION)
+            .map(String::as_str)
+    }
+
+    /// Schema-aware checks run by [`validate_create_mode`] once the engine is
+    /// confirmed active.  For every `fields.<col>.<known-suffix>` key
+    /// (currently `aggregate-function` and `list-agg-delimiter`):
+    /// * the `<col>` segment must name an existing schema field; this catches
+    ///   typo'd column names that would otherwise silently fall back to the
+    ///   default function / default delimiter at read time.
+    ///
+    /// For `aggregate-function` keys additionally:
+    /// * the function name must be one of the supported aggregators
+    /// * the function must accept the field's declared data type
+    ///
+    /// `fields.default-aggregate-function` only has its name validated;
+    /// per-column type compatibility for the default is deferred to runtime
+    /// because the default applies broadly across columns.
+    fn validate_field_scoped_options(&self, fields: &[DataField]) -> 
crate::Result<()> {
+        for (key, value) in self.options {
+            let Some((col, kind)) = parse_field_scoped_option_key(key) else {
+                continue;
+            };
+            let Some(field) = fields.iter().find(|f| f.name() == col) else {
+                let mut available: Vec<&str> = 
fields.iter().map(DataField::name).collect();
+                available.sort();
+                return Err(crate::Error::ConfigInvalid {
+                    message: format!(
+                        "Aggregation field '{col}' referenced by '{key}' is 
not declared in \
+                         the table schema; available columns: [{}]",
+                        available.join(", ")
+                    ),
+                });
+            };
+            if matches!(kind, FieldScopedOptionKind::AggregateFunction) {
+                validate_aggregator_for_type(value, col, field.data_type())?;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to