wuchong commented on code in PR #2307:
URL: https://github.com/apache/fluss/pull/2307#discussion_r2679358110
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -1221,6 +1221,136 @@ void testVersionMergeEngineWithTypeTimestampLTZ9()
throws Exception {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
+ @Test
+ void testComprehensiveAggregationFunctions() throws Exception {
+ // Test all 11 aggregate functions (each function tested once with
representative data type)
+ tEnv.executeSql(
+ "create table comprehensive_agg ("
+ + "id int not null primary key not enforced, "
+ // Numeric aggregations
+ + "sum_int int, "
+ + "sum_double double, "
+ + "product_int int, "
+ // Max/Min aggregations (representative types: int,
double, string,
+ // timestamp)
+ + "max_int int, "
+ + "max_timestamp timestamp(3), "
+ + "min_double double, "
+ + "min_string string, "
+ // Value selection aggregations (test with/without
nulls)
+ + "first_val int, "
+ + "first_val_non_null int, "
+ + "last_val int, "
+ + "last_val_non_null int, "
+ // Boolean aggregations
+ + "bool_and_val boolean, "
+ + "bool_or_val boolean, "
+ // String aggregation with custom delimiter
+ + "listagg_val string"
+ + ") with ("
+ + "'table.merge-engine' = 'aggregation', "
+ + "'fields.sum_int.agg' = 'sum', "
+ + "'fields.sum_double.agg' = 'sum', "
+ + "'fields.product_int.agg' = 'product', "
+ + "'fields.max_int.agg' = 'max', "
+ + "'fields.max_timestamp.agg' = 'max', "
+ + "'fields.min_double.agg' = 'min', "
+ + "'fields.min_string.agg' = 'min', "
+ + "'fields.first_val.agg' = 'first_value', "
+ + "'fields.first_val_non_null.agg' =
'first_value_ignore_nulls', "
+ + "'fields.last_val.agg' = 'last_value', "
+ + "'fields.last_val_non_null.agg' =
'last_value_ignore_nulls', "
+ + "'fields.bool_and_val.agg' = 'bool_and', "
+ + "'fields.bool_or_val.agg' = 'bool_or', "
+ + "'fields.listagg_val.agg' = 'listagg', "
+ + "'fields.listagg_val.agg.params.delimiter' = '|')");
Review Comment:
The previous approach `fields.<field-name>.agg.params.<param-name>` has
several drawbacks:
Since `fields.<field-name>.agg` is already designated to store the
aggregation function name, it cannot simultaneously serve as a parent object
for nested parameters. In most configuration systems, such as YAML, JSON, a
given key cannot be both a **leaf** (holding a scalar value) and a **branch**
(containing child keys) at the same time.
Here are a few alternative proposals which is also shorter:
1. Use `'fields.<field-name>.params.<param>'`, for example:
`'fields.tags.params.delimiter' = ','`
2. Or use `'fields.<field-name>.<function-name>.<param>' = 'value'`, for
example:
`'fields.tags.listagg.delimiter' = ','`
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkAggFunctionParserTest.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.AggFunction;
+import org.apache.fluss.metadata.AggFunctionType;
+import org.apache.fluss.metadata.AggFunctions;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FlinkAggFunctionParser}. */
+class FlinkAggFunctionParserTest {
+
+ @Test
+ void testParseNoAggFunction() {
+ Configuration options = new Configuration();
+ assertThat(FlinkAggFunctionParser.parseAggFunction("total",
options)).isEmpty();
+ }
+
+ @Test
+ void testParseFunctionWithoutParameters() {
+ Configuration options = new Configuration();
+ options.setString("fields.total.agg", "sum");
+
+ Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("total", options);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().getType()).isEqualTo(AggFunctionType.SUM);
+ assertThat(result.get().hasParameters()).isFalse();
+ }
+
+ @Test
+ void testParseFunctionWithParameters() {
+ Configuration options = new Configuration();
+ options.setString("fields.tags.agg", "listagg");
+ options.setString("fields.tags.agg.params.delimiter", ";");
+
+ Optional<AggFunction> result =
FlinkAggFunctionParser.parseAggFunction("tags", options);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+ assertThat(result.get().getParameter("delimiter")).contains(";");
+ }
+
+ @Test
+ void testParseColumnNameIsolation() {
+ // Test that configurations for different columns don't interfere with
each other
+ Configuration options = new Configuration();
+ options.setString("fields.col1.agg", "sum");
+ options.setString("fields.col2.agg", "listagg");
+ options.setString("fields.col2.agg.params.delimiter", "|"); // This
should not affect col1
+
+ Optional<AggFunction> col1Func =
FlinkAggFunctionParser.parseAggFunction("col1", options);
+ Optional<AggFunction> col2Func =
FlinkAggFunctionParser.parseAggFunction("col2", options);
+ Optional<AggFunction> col3Func =
FlinkAggFunctionParser.parseAggFunction("col3", options);
+
+ // col1 should have SUM without parameters
+ assertThat(col1Func).isPresent();
+ assertThat(col1Func.get().getType()).isEqualTo(AggFunctionType.SUM);
+ assertThat(col1Func.get().hasParameters()).isFalse();
+
+ // col2 should have LISTAGG with delimiter parameter
+ assertThat(col2Func).isPresent();
+
assertThat(col2Func.get().getType()).isEqualTo(AggFunctionType.LISTAGG);
+ assertThat(col2Func.get().getParameter("delimiter")).contains("|");
+
+ // col3 should have no configuration
+ assertThat(col3Func).isEmpty();
+ }
+
+ @Test
+ void testParseInvalidFunctionName() {
+ Configuration options = new Configuration();
+ options.setString("fields.total.agg", "invalid_function");
+
+ assertThatThrownBy(() ->
FlinkAggFunctionParser.parseAggFunction("total", options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unknown aggregation function")
+ .hasMessageContaining("invalid_function");
+ }
+
+ @Test
+ void testParseEmptyFunctionName() {
+ Configuration options = new Configuration();
+ options.setString("fields.total.agg", "");
+
+ assertThatThrownBy(() ->
FlinkAggFunctionParser.parseAggFunction("total", options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Empty aggregation function name");
+ }
+
+ @Test
+ void testRoundTripConversion() {
+ // Test that parse and format are inverse operations
+ java.util.Map<String, String> options = new java.util.HashMap<>();
Review Comment:
replace qualified name with imports
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java:
##########
@@ -415,6 +415,59 @@ void testFlinkMaterializedTableConversions() {
checkEqualsIgnoreSchema(convertedFlinkMaterializedTable,
expectedTable);
}
+ @Test
+ void testAggregationFunctionRoundTrip() {
+ // Test Flink → Fluss → Flink conversion preserves aggregation
functions
+ ResolvedSchema schema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical(
+ "id",
org.apache.flink.table.api.DataTypes.INT().notNull()),
+ Column.physical(
+ "sum_val",
org.apache.flink.table.api.DataTypes.INT()),
+ Column.physical(
+ "tags",
org.apache.flink.table.api.DataTypes.STRING())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("PK_id",
Collections.singletonList("id")));
+
+ Map<String, String> options = new HashMap<>();
+ options.put("table.merge-engine", "aggregation");
+ options.put("fields.sum_val.agg", "sum");
+ options.put("fields.tags.agg", "listagg");
+ options.put("fields.tags.agg.params.delimiter", "|");
+
+ CatalogTable flinkTable =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(schema).build(),
+ "round trip test",
+ Collections.emptyList(),
+ options);
+
+ // Flink → Fluss
+ TableDescriptor flussTable =
+ FlinkConversions.toFlussTable(new
ResolvedCatalogTable(flinkTable, schema));
+
+ // Fluss → Flink
+ TablePath tablePath = TablePath.of("db", "table");
+ long currentMillis = System.currentTimeMillis();
+ TableInfo tableInfo =
+ TableInfo.of(
+ tablePath,
+ 1L,
+ 1,
+ flussTable.withBucketCount(1),
+ currentMillis,
+ currentMillis);
+ CatalogTable convertedFlinkTable = (CatalogTable)
FlinkConversions.toFlinkTable(tableInfo);
+
+ // Verify aggregation functions are preserved
+ assertThat(convertedFlinkTable.getOptions())
+ .containsEntry("table.merge-engine", "aggregation")
+ .containsEntry("fields.sum_val.agg", "sum")
+ .containsEntry("fields.tags.agg", "listagg")
+ .containsEntry("fields.tags.agg.params.delimiter", "|");
Review Comment:
Replace to
`assertThat(convertedFlinkTable.getOptions()).containsAllEntriesOf(options);`
to avoid duplicate declare the map keys and values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]