This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 69367649 [FLINK-27626] Introduce pre-aggregated merge to table store
69367649 is described below
commit 69367649cba9af2d2628287d84ea48e254d315b0
Author: Hannankan <[email protected]>
AuthorDate: Mon Sep 19 17:17:39 2022 +0800
[FLINK-27626] Introduce pre-aggregated merge to table store
This closes #270
---
docs/content/docs/development/create-table.md | 42 ++
.../shortcodes/generated/core_configuration.html | 2 +-
.../flink/table/store/utils/RowDataUtils.java | 47 ++
.../flink/table/store/utils/RowDataUtilsTest.java | 30 +
.../store/connector/source/FlinkSourceBuilder.java | 13 +-
.../store/connector/PreAggregationITCase.java | 771 +++++++++++++++++++++
.../org/apache/flink/table/store/CoreOptions.java | 4 +-
.../compact/aggregate/AggregateMergeFunction.java | 121 ++++
.../compact/aggregate/FieldAggregator.java | 75 ++
.../compact/aggregate/FieldBoolAndAgg.java | 46 ++
.../compact/aggregate/FieldBoolOrAgg.java | 46 ++
.../aggregate/FieldLastNonNullValueAgg.java | 34 +
.../compact/aggregate/FieldLastValueAgg.java | 34 +
.../compact/aggregate/FieldListaggAgg.java | 60 ++
.../mergetree/compact/aggregate/FieldMaxAgg.java | 47 ++
.../mergetree/compact/aggregate/FieldMinAgg.java | 47 ++
.../mergetree/compact/aggregate/FieldSumAgg.java | 78 +++
.../table/ChangelogWithKeyFileStoreTable.java | 21 +-
.../compact/aggregate/FieldAggregatorTest.java | 115 +++
19 files changed, 1623 insertions(+), 10 deletions(-)
diff --git a/docs/content/docs/development/create-table.md
b/docs/content/docs/development/create-table.md
index 4a43fa7d..1b582c0f 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -275,6 +275,48 @@ __Note:__
- It is best not to have NULL values in the fields, NULL will not overwrite
data.
{{< /hint >}}
+## Pre-aggregate
+You can configure pre-aggregate for each value field from options:
+
+```sql
+CREATE TABLE MyTable (
+ product_id BIGINT,
+ price DOUBLE,
+ sales BIGINT,
+ PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+ 'merge-engine' = 'aggregation',
+ 'fields.price.aggregate-function'='max',
+ 'fields.sales.aggregate-function'='sum'
+);
+```
+Each value field is aggregated with the latest data one by one
+under the same primary key according to the aggregate function.
+
+For example, the inputs:
+- <1, 23.0, 15>
+- <1, 30.2, 20>
+
+Output:
+- <1, 30.2, 35>
+
+### Supported Aggregate Functions and Related Data Types
+Supported aggregate functions include `sum`, `max/min`, `last_non_null_value`,
`last_value`, `listagg`, `bool_or/bool_and`.
+These functions support different data types.
+
+- `sum` supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE
data types.
+- `max/min` support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT,
DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ data types.
+- `last_non_null_value/last_value` support all data types.
+- `listagg` supports STRING data types.
+- `bool_and/bool_or` support BOOLEAN data type.
+
+{{< hint info >}}
+__Note:__
+- Pre-aggregate is only supported for table with primary key.
+- Pre-aggregate is not supported for streaming consuming.
+- Pre-aggregate currently only support INSERT changes.
+{{< /hint >}}
+
## Append-only Table
Append-only tables are a performance feature that only accepts `INSERT_ONLY`
data to append to the storage instead of
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b093a1a4..c01d1bc8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -144,7 +144,7 @@
<td><h5>merge-engine</h5></td>
<td style="word-wrap: break-word;">deduplicate</td>
<td><p>Enum</p></td>
- <td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null fields.</li></ul></td>
+ <td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null
fields.</li><li>"aggregation": Aggregate fields with same primary
key.</li></ul></td>
</tr>
<tr>
<td><h5>num-levels</h5></td>
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
index d95824ce..a38c0ad4 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.utils;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
@@ -27,6 +28,7 @@ import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryArrayData;
import org.apache.flink.table.data.binary.BinaryMapData;
import org.apache.flink.table.data.binary.BinaryRawValueData;
@@ -38,6 +40,7 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
@@ -295,4 +298,48 @@ public class RowDataUtils {
};
}
}
+
+ public static int compare(Object x, Object y, LogicalTypeRoot type) {
+ int ret;
+ switch (type) {
+ case DECIMAL:
+ DecimalData xDD = (DecimalData) x;
+ DecimalData yDD = (DecimalData) y;
+ assert xDD.scale() == yDD.scale() : "Inconsistent scale of
aggregate DecimalData!";
+ assert xDD.precision() == yDD.precision()
+ : "Inconsistent precision of aggregate DecimalData!";
+ ret = DecimalDataUtils.compare(xDD, yDD);
+ break;
+ case TINYINT:
+ ret = Byte.compare((byte) x, (byte) y);
+ break;
+ case SMALLINT:
+ ret = Short.compare((short) x, (short) y);
+ break;
+ case INTEGER:
+ case DATE:
+ ret = Integer.compare((int) x, (int) y);
+ break;
+ case BIGINT:
+ ret = Long.compare((long) x, (long) y);
+ break;
+ case FLOAT:
+ ret = Float.compare((float) x, (float) y);
+ break;
+ case DOUBLE:
+ ret = Double.compare((double) x, (double) y);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ TimestampData xDD1 = (TimestampData) x;
+ TimestampData yDD1 = (TimestampData) y;
+ ret = xDD1.compareTo(yDD1);
+ break;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException();
+ default:
+ throw new IllegalArgumentException();
+ }
+ return ret;
+ }
}
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
index 63e7670c..66194672 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
@@ -23,14 +23,20 @@ import
org.apache.flink.connector.datagen.table.RandomGeneratorVisitor;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link RowDataUtils}. */
@@ -106,4 +112,28 @@ public class RowDataUtilsTest {
private BinaryRowData toBinary(RowData row) {
return serializer.toBinaryRow(row).copy();
}
+
+ @Test
+ public void testCompare() {
+ // test DECIMAL data type
+ DecimalData xDecimalData = DecimalData.fromBigDecimal(new
BigDecimal("12.34"), 4, 2);
+ DecimalData yDecimalData = DecimalData.fromBigDecimal(new
BigDecimal("13.14"), 4, 2);
+ assertThat(RowDataUtils.compare(xDecimalData, yDecimalData,
LogicalTypeRoot.DECIMAL))
+ .isLessThan(0);
+
+ // test DOUBLE data type
+ double xDouble = 13.14;
+ double yDouble = 12.13;
+ assertThat(RowDataUtils.compare(xDouble, yDouble,
LogicalTypeRoot.DOUBLE)).isGreaterThan(0);
+
+ // test TIMESTAMP_WITHOUT_TIME_ZONE data type
+ TimestampData xTimestampData =
TimestampData.fromLocalDateTime(LocalDateTime.now());
+ TimestampData yTimestampData =
TimestampData.fromTimestamp(xTimestampData.toTimestamp());
+ assertThat(
+ RowDataUtils.compare(
+ xTimestampData,
+ yTimestampData,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE))
+ .isEqualTo(0);
+ }
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index c3f565a6..e975796c 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
+import java.util.HashMap;
import java.util.Optional;
import static
org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
@@ -128,10 +129,18 @@ public class FlinkSourceBuilder {
private Source<RowData, ?, ?> buildSource() {
if (isContinuous) {
// TODO move validation to a dedicated method
+ MergeEngine mergeEngine = conf.get(MERGE_ENGINE);
+ HashMap<MergeEngine, String> mergeEngineDesc =
+ new HashMap<MergeEngine, String>() {
+ {
+ put(MergeEngine.PARTIAL_UPDATE, "Partial update");
+ put(MergeEngine.AGGREGATE, "Pre-aggregate");
+ }
+ };
if (table.schema().primaryKeys().size() > 0
- && conf.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) {
+ && mergeEngineDesc.containsKey(mergeEngine)) {
throw new ValidationException(
- "Partial update continuous reading is not supported.");
+ mergeEngineDesc.get(mergeEngine) + " continuous
reading is not supported.");
}
LogStartupMode startupMode = conf.get(LOG_SCAN);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
new file mode 100644
index 00000000..c0cfa506
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for pre-aggregation. */
+public class PreAggregationITCase {
+ /** ITCase for bool_or/bool_and aggregate function. */
+ public static class BoolOrAndAggregation extends FileStoreTableITCase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T7 ("
+ + "j INT, k INT, "
+ + "a BOOLEAN, "
+ + "b BOOLEAN,"
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ + "'fields.a.aggregate-function'='bool_or',"
+ + "'fields.b.aggregate-function'='bool_and'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ batchSql(
+ "INSERT INTO T7 VALUES "
+ + "(1, 2, CAST('TRUE' AS BOOLEAN), CAST('TRUE' AS
BOOLEAN)),"
+ + "(1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS
BOOLEAN)), "
+ + "(1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE'
AS BOOLEAN))");
+ List<Row> result = batchSql("SELECT * FROM T7");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, true,
false));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql(
+ "INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS BOOLEAN),
CAST('TRUE' AS BOOLEAN))");
+ batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN),
CAST(NULL AS BOOLEAN))");
+ batchSql(
+ "INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN),
CAST('FALSE' AS BOOLEAN))");
+
+ List<Row> result = batchSql("SELECT * FROM T7");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, true,
false));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE T7 SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql(
+ "INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS BOOLEAN),
CAST('TRUE' AS BOOLEAN))");
+ batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN),
CAST(NULL AS BOOLEAN))");
+ batchSql(
+ "INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN),
CAST('FALSE' AS BOOLEAN))");
+
+ // key 1 3
+ batchSql(
+ "INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS BOOLEAN),
CAST('TRUE' AS BOOLEAN))");
+ batchSql("INSERT INTO T7 VALUES (1, 3, CAST(NULL AS BOOLEAN),
CAST(NULL AS BOOLEAN))");
+ batchSql(
+ "INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS BOOLEAN),
CAST('TRUE' AS BOOLEAN))");
+
+ assertThat(batchSql("SELECT * FROM T7"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 2, true, false), Row.of(1, 3, false,
true));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T7").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
+
+ /** ITCase for listagg aggregate function. */
+ public static class ListAggAggregation extends FileStoreTableITCase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T6 ("
+ + "j INT, k INT, "
+ + "a STRING, "
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ + "'fields.a.aggregate-function'='listagg'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ batchSql(
+ "INSERT INTO T6 VALUES "
+ + "(1, 2, 'first line'),"
+ + "(1, 2, CAST(NULL AS STRING)), "
+ + "(1, 2, 'second line')");
+ List<Row> result = batchSql("SELECT * FROM T6");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, "first
line,second line"));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')");
+ batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))");
+ batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')");
+
+ List<Row> result = batchSql("SELECT * FROM T6");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, "first
line,second line"));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE T6 SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')");
+ batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))");
+ batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')");
+
+ // key 1 3
+ batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))");
+ batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))");
+ batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))");
+
+ assertThat(batchSql("SELECT * FROM T6"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 2, "first line,second line"), Row.of(1,
3, null));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T6").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
+
+ /** ITCase for last value aggregate function. */
+ public static class LastValueAggregation extends FileStoreTableITCase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T5 ("
+ + "j INT, k INT, "
+ + "a INT, "
+ + "i DATE,"
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ + "'fields.a.aggregate-function'='last_value', "
+ + "'fields.i.aggregate-function'='last_value'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ batchSql(
+ "INSERT INTO T5 VALUES "
+ + "(1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS
DATE)),"
+ + "(1, 2, 2, CAST('2020-01-02' AS DATE)), "
+ + "(1, 2, 3, CAST(NULL AS DATE))");
+ List<Row> result = batchSql("SELECT * FROM T5");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3,
null));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT),
CAST('2020-01-01' AS DATE))");
+ batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS
DATE))");
+ batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+ List<Row> result = batchSql("SELECT * FROM T5");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3,
null));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE T5 SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT),
CAST('2020-01-01' AS DATE))");
+ batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS
DATE))");
+ batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+ // key 1 3
+ batchSql("INSERT INTO T5 VALUES (1, 3, 3, CAST('2020-01-01' AS
DATE))");
+ batchSql("INSERT INTO T5 VALUES (1, 3, 2, CAST(NULL AS DATE))");
+ batchSql("INSERT INTO T5 VALUES (1, 3, CAST(NULL AS INT),
CAST('2022-01-02' AS DATE))");
+
+ assertThat(batchSql("SELECT * FROM T5"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 2, 3, null), Row.of(1, 3, null,
LocalDate.of(2022, 1, 2)));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T5").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
+
+ /** ITCase for last non-null value aggregate function. */
+ public static class LastNonNullValueAggregation extends
FileStoreTableITCase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T4 ("
+ + "j INT, k INT, "
+ + "a INT, "
+ + "i DATE,"
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ +
"'fields.a.aggregate-function'='last_non_null_value', "
+ +
"'fields.i.aggregate-function'='last_non_null_value'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ batchSql(
+ "INSERT INTO T4 VALUES "
+ + "(1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS
DATE)),"
+ + "(1, 2, 2, CAST('2020-01-02' AS DATE)), "
+ + "(1, 2, 3, CAST(NULL AS DATE))");
+ List<Row> result = batchSql("SELECT * FROM T4");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3,
LocalDate.of(2020, 1, 2)));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT),
CAST('2020-01-01' AS DATE))");
+ batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST('2020-01-02' AS
DATE))");
+ batchSql("INSERT INTO T4 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+ List<Row> result = batchSql("SELECT * FROM T4");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3,
LocalDate.of(2020, 1, 2)));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE T4 SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT),
CAST('2020-01-01' AS DATE))");
+ batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST('2020-01-02' AS
DATE))");
+ batchSql("INSERT INTO T4 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+ // key 1 3
+ batchSql("INSERT INTO T4 VALUES (1, 3, 3, CAST('2020-01-01' AS
DATE))");
+ batchSql("INSERT INTO T4 VALUES (1, 3, 2, CAST(NULL AS DATE))");
+ batchSql("INSERT INTO T4 VALUES (1, 3, CAST(NULL AS INT),
CAST('2022-01-02' AS DATE))");
+
+ assertThat(batchSql("SELECT * FROM T4"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 2, 3, LocalDate.of(2020, 1, 2)),
+ Row.of(1, 3, 2, LocalDate.of(2022, 1, 2)));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T4").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
+
+ /** ITCase for min aggregate function. */
+ public static class MinAggregation extends FileStoreTableITCase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T3 ("
+ + "j INT, k INT, "
+ + "a INT, "
+ + "b Decimal(4,2), "
+ + "c TINYINT,"
+ + "d SMALLINT,"
+ + "e BIGINT,"
+ + "f FLOAT,"
+ + "h DOUBLE,"
+ + "i DATE,"
+ + "l TIMESTAMP,"
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ + "'fields.a.aggregate-function'='min', "
+ + "'fields.b.aggregate-function'='min', "
+ + "'fields.c.aggregate-function'='min', "
+ + "'fields.d.aggregate-function'='min', "
+ + "'fields.e.aggregate-function'='min', "
+ + "'fields.f.aggregate-function'='min',"
+ + "'fields.h.aggregate-function'='min',"
+ + "'fields.i.aggregate-function'='min',"
+ + "'fields.l.aggregate-function'='min'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 2, CAST(NULL AS INT), 1.01, CAST(-1 AS
TINYINT), CAST(-1 AS SMALLINT), "
+ + "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS
DOUBLE), CAST('2020-01-01' AS DATE), "
+ + "CAST('0001-01-01 01:01:01' AS TIMESTAMP)),"
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), "
+ + "CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS
DOUBLE), CAST('2020-01-02' AS DATE), "
+ + "CAST('0002-01-01 01:01:01' AS TIMESTAMP)), "
+ + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), "
+ + "CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS
DOUBLE), CAST('2022-01-02' AS DATE), "
+ + "CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+ List<Row> result = batchSql("SELECT * FROM T3");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 2,
+ new BigDecimal("1.01"),
+ (byte) -1,
+ (short) -1,
+ (long) 1000,
+ (float) -1.11,
+ -1.11,
+ LocalDate.of(2020, 1, 1),
+ LocalDateTime.of(1, 1, 1, 1, 1, 1)));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+ List<Row> result = batchSql("SELECT * FROM T3");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 2,
+ new BigDecimal("1.01"),
+ (byte) -1,
+ (short) -1,
+ (long) 1000,
+ (float) -1.11,
+ -1.11,
+ LocalDate.of(2020, 1, 1),
+ LocalDateTime.of(1, 1, 1, 1, 1, 1)));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+ // key 1 3
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T3 VALUES "
+ + "(1, 3, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+ assertThat(batchSql("SELECT * FROM T3"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 2,
+ new BigDecimal("1.01"),
+ (byte) -1,
+ (short) -1,
+ (long) 1000,
+ (float) -1.11,
+ -1.11,
+ LocalDate.of(2020, 1, 1),
+ LocalDateTime.of(1, 1, 1, 1, 1, 1)),
+ Row.of(
+ 1,
+ 3,
+ 3,
+ new BigDecimal("1.01"),
+ (byte) -1,
+ (short) -1,
+ (long) 1000,
+ (float) -1.11,
+ -1.11,
+ LocalDate.of(2020, 1, 1),
+ LocalDateTime.of(1, 1, 1, 1, 1, 1)));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T3").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
+
+ /** ITCase for max aggregate function. */
+ public static class MaxAggregation extends FileStoreTableITCase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T2 ("
+ + "j INT, k INT, "
+ + "a INT, "
+ + "b Decimal(4,2), "
+ + "c TINYINT,"
+ + "d SMALLINT,"
+ + "e BIGINT,"
+ + "f FLOAT,"
+ + "h DOUBLE,"
+ + "i DATE,"
+ + "l TIMESTAMP,"
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ + "'fields.a.aggregate-function'='max', "
+ + "'fields.b.aggregate-function'='max', "
+ + "'fields.c.aggregate-function'='max', "
+ + "'fields.d.aggregate-function'='max', "
+ + "'fields.e.aggregate-function'='max', "
+ + "'fields.f.aggregate-function'='max',"
+ + "'fields.h.aggregate-function'='max',"
+ + "'fields.i.aggregate-function'='max',"
+ + "'fields.l.aggregate-function'='max'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), "
+ + "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS
DOUBLE), CAST('2020-01-01' AS DATE), "
+ + "CAST('0001-01-01 01:01:01' AS TIMESTAMP)),"
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), "
+ + "CAST('0002-01-01 01:01:01' AS TIMESTAMP)), "
+ + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), "
+ + "CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+ List<Row> result = batchSql("SELECT * FROM T2");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 3,
+ new BigDecimal("10.00"),
+ (byte) 2,
+ (short) 2,
+ (long) 10000000,
+ (float) 1.11,
+ 1.21,
+ LocalDate.of(2022, 1, 2),
+ LocalDateTime.of(2, 1, 1, 2, 0, 0)));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), "
+ + "CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+ + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), "
+ + "CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), 0, "
+ + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), "
+ + "CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+ List<Row> result = batchSql("SELECT * FROM T2");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 3,
+ new BigDecimal("10.00"),
+ (byte) 2,
+ (short) 2,
+ (long) 10000000,
+ (float) 1.11,
+ 1.21,
+ LocalDate.of(2022, 1, 2),
+ LocalDateTime.of(2, 1, 1, 2, 0, 0)));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE T2 SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+ + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), 0, "
+ + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+ // key 1 3
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+ + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+ batchSql(
+ "INSERT INTO T2 VALUES "
+ + "(1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), 0, "
+ + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+ assertThat(batchSql("SELECT * FROM T2"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 3,
+ new BigDecimal("10.00"),
+ (byte) 2,
+ (short) 2,
+ (long) 10000000,
+ (float) 1.11,
+ 1.21,
+ LocalDate.of(2022, 1, 2),
+ LocalDateTime.of(2, 1, 1, 2, 0, 0)),
+ Row.of(
+ 1,
+ 3,
+ 6,
+ new BigDecimal("10.00"),
+ (byte) 2,
+ (short) 2,
+ (long) 10000000,
+ (float) 1.11,
+ 1.21,
+ LocalDate.of(2022, 1, 2),
+ LocalDateTime.of(2, 1, 1, 2, 0, 0)));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T2").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
+
+ /** ITCase for sum aggregate function. */
+ public static class SumAggregation extends FileStoreTableITCase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T1 ("
+ + "j INT, k INT, "
+ + "a INT, "
+ + "b Decimal(4,2), "
+ + "c TINYINT,"
+ + "d SMALLINT,"
+ + "e BIGINT,"
+ + "f FLOAT,"
+ + "h DOUBLE,"
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ + "'fields.a.aggregate-function'='sum', "
+ + "'fields.b.aggregate-function'='sum', "
+ + "'fields.c.aggregate-function'='sum', "
+ + "'fields.d.aggregate-function'='sum', "
+ + "'fields.e.aggregate-function'='sum', "
+ + "'fields.f.aggregate-function'='sum',"
+ + "'fields.h.aggregate-function'='sum'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), "
+ + "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS
DOUBLE)),"
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), "
+ + "CAST(100000 AS BIGINT), -1.11, CAST(1.11 AS
DOUBLE)), "
+ + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), "
+ + "CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS
DOUBLE))");
+ List<Row> result = batchSql("SELECT * FROM T1");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 5,
+ new BigDecimal("12.11"),
+ (byte) 4,
+ (short) 2,
+ (long) 10101000,
+ (float) 0,
+ 1.11));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS
SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE))");
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ + "CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))");
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ + "-1.11, CAST(-1.11 AS DOUBLE))");
+
+ List<Row> result = batchSql("SELECT * FROM T1");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 6,
+ new BigDecimal("12.11"),
+ (byte) 4,
+ (short) 2,
+ (long) 10101000,
+ (float) 0,
+ 1.11));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE T1 SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS
SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE))");
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ + "CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))");
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ + "-1.11, CAST(-1.11 AS DOUBLE))");
+
+ // key 1 3
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 3, 2, 1.01, CAST(1 AS TINYINT), CAST(-1 AS
SMALLINT), CAST(1000 AS BIGINT), "
+ + "1.11, CAST(1.11 AS DOUBLE))");
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 3, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ + "CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))");
+ batchSql(
+ "INSERT INTO T1 VALUES "
+ + "(1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ + "-1.11, CAST(-1.11 AS DOUBLE))");
+
+ assertThat(batchSql("SELECT * FROM T1"))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1,
+ 2,
+ 6,
+ new BigDecimal("12.11"),
+ (byte) 4,
+ (short) 2,
+ (long) 10101000,
+ (float) 0,
+ 1.11),
+ Row.of(
+ 1,
+ 3,
+ 7,
+ new BigDecimal("12.11"),
+ (byte) 4,
+ (short) 2,
+ (long) 10101000,
+ (float) 0,
+ 1.11));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T1").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index ce10ad5f..31b6b1f2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -482,7 +482,9 @@ public class CoreOptions implements Serializable {
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
- PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
+ PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
+
+ AGGREGATE("aggregation", "Aggregate fields with same primary key.");
private final String value;
private final String description;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
new file mode 100644
index 00000000..3fbb4bbd
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the
partial record,
+ * pre-aggregate non-null fields on merge.
+ */
+public class AggregateMergeFunction implements MergeFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RowData.FieldGetter[] getters;
+
+ private final RowAggregator rowAggregator;
+
+ private transient GenericRowData row;
+
+ public AggregateMergeFunction(RowData.FieldGetter[] getters, RowAggregator
rowAggregator) {
+ this.getters = getters;
+ this.rowAggregator = rowAggregator;
+ }
+
+ @Override
+ public void reset() {
+ this.row = new GenericRowData(getters.length);
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ checkArgument(
+ kv.valueKind() == RowKind.INSERT || kv.valueKind() ==
RowKind.UPDATE_AFTER,
+ "Pre-aggregate can not accept delete records!");
+ for (int i = 0; i < getters.length; i++) {
+ FieldAggregator fieldAggregator =
rowAggregator.getFieldAggregatorAtPos(i);
+ Object accumulator = getters[i].getFieldOrNull(row);
+ Object inputField = getters[i].getFieldOrNull(kv.value());
+ Object mergedField = fieldAggregator.agg(accumulator, inputField);
+ row.setField(i, mergedField);
+ }
+ }
+
+ @Nullable
+ @Override
+ public RowData getValue() {
+ return row;
+ }
+
+ @Override
+ public MergeFunction copy() {
+ // RowData.FieldGetter is thread safe
+ return new AggregateMergeFunction(getters, rowAggregator);
+ }
+
+ /** Provide an Aggregator for merge a new row data. */
+ public static class RowAggregator implements Serializable {
+ public static final String FIELDS = "fields";
+ public static final String AGG_FUNCTION = "aggregate-function";
+
+ private final FieldAggregator[] fieldAggregators;
+
+ public RowAggregator(
+ Configuration sqlConf,
+ List<String> fieldNames,
+ List<LogicalType> fieldTypes,
+ List<String> primaryKeys) {
+ fieldAggregators = new FieldAggregator[fieldNames.size()];
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String fieldName = fieldNames.get(i);
+ LogicalType fieldType = fieldTypes.get(i);
+ // aggregate by primary keys, so they do not aggregate
+ boolean isPrimaryKey = primaryKeys.contains(fieldName);
+ String strAggFunc =
+ sqlConf.getString(
+ key(FIELDS + "." + fieldName + "." +
AGG_FUNCTION)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Get " + fieldName + "'s
aggregate function"));
+ fieldAggregators[i] =
+ FieldAggregator.createFieldAggregator(fieldType,
strAggFunc, isPrimaryKey);
+ }
+ }
+
+ public FieldAggregator getFieldAggregatorAtPos(int fieldPos) {
+ return fieldAggregators[fieldPos];
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
new file mode 100644
index 00000000..a9780eba
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+
+/** abstract class of aggregating a field of a row. */
+public abstract class FieldAggregator implements Serializable {
+ protected LogicalType fieldType;
+
+ public FieldAggregator(LogicalType logicalType) {
+ this.fieldType = logicalType;
+ }
+
+ static FieldAggregator createFieldAggregator(
+ LogicalType fieldType, String strAgg, boolean isPrimaryKey) {
+ final FieldAggregator fieldAggregator;
+ if (isPrimaryKey) {
+ fieldAggregator = new FieldLastValueAgg(fieldType);
+ } else {
+ // ordered by type root definition
+ switch (strAgg) {
+ case "sum":
+ fieldAggregator = new FieldSumAgg(fieldType);
+ break;
+ case "max":
+ fieldAggregator = new FieldMaxAgg(fieldType);
+ break;
+ case "min":
+ fieldAggregator = new FieldMinAgg(fieldType);
+ break;
+ case "last_non_null_value":
+ fieldAggregator = new FieldLastNonNullValueAgg(fieldType);
+ break;
+ case "last_value":
+ fieldAggregator = new FieldLastValueAgg(fieldType);
+ break;
+ case "listagg":
+ fieldAggregator = new FieldListaggAgg(fieldType);
+ break;
+ case "bool_or":
+ fieldAggregator = new FieldBoolOrAgg(fieldType);
+ break;
+ case "bool_and":
+ fieldAggregator = new FieldBoolAndAgg(fieldType);
+ break;
+ default:
+ throw new ValidationException(
+ "Use unsupported aggregation or spell aggregate
function incorrectly!");
+ }
+ }
+ return fieldAggregator;
+ }
+
+ abstract Object agg(Object accumulator, Object inputField);
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolAndAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolAndAgg.java
new file mode 100644
index 00000000..07dcd0bd
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolAndAgg.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** bool_and aggregate a field of a row. */
+public class FieldBoolAndAgg extends FieldAggregator {
+ public FieldBoolAndAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ Object boolAnd;
+
+ if (accumulator == null || inputField == null) {
+ boolAnd = (inputField == null) ? accumulator : inputField;
+ } else {
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ boolAnd = (boolean) accumulator && (boolean) inputField;
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ return boolAnd;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolOrAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolOrAgg.java
new file mode 100644
index 00000000..73163f0b
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolOrAgg.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** bool_or aggregate a field of a row. */
+public class FieldBoolOrAgg extends FieldAggregator {
+ public FieldBoolOrAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ Object boolOr;
+
+ if (accumulator == null || inputField == null) {
+ boolOr = (inputField == null) ? accumulator : inputField;
+ } else {
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ boolOr = (boolean) accumulator || (boolean) inputField;
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ return boolOr;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
new file mode 100644
index 00000000..9049808a
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** last non-null value aggregate a field of a row. */
+public class FieldLastNonNullValueAgg extends FieldAggregator {
+
+ public FieldLastNonNullValueAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ return (inputField == null) ? accumulator : inputField;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastValueAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastValueAgg.java
new file mode 100644
index 00000000..266b5a2b
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastValueAgg.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** last value aggregate a field of a row. */
+public class FieldLastValueAgg extends FieldAggregator {
+
+ public FieldLastValueAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ return inputField;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java
new file mode 100644
index 00000000..69b10b5f
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** listagg aggregate a field of a row. */
+public class FieldListaggAgg extends FieldAggregator {
+ // TODO: make it configurable by with clause
+ public static final String DELIMITER = ",";
+
+ public FieldListaggAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ Object concatenate;
+
+ if (inputField == null || accumulator == null) {
+ concatenate = (inputField == null) ? accumulator : inputField;
+ } else {
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case VARCHAR:
+ // TODO: ensure not VARCHAR(n)
+ StringData mergeFieldSD = (StringData) accumulator;
+ StringData inFieldSD = (StringData) inputField;
+ concatenate =
+ BinaryStringDataUtil.concat(
+ (BinaryStringData) mergeFieldSD,
+ new BinaryStringData(DELIMITER),
+ (BinaryStringData) inFieldSD);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ return concatenate;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMaxAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMaxAgg.java
new file mode 100644
index 00000000..cffbc5a6
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMaxAgg.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.store.utils.RowDataUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+/** max aggregate a field of a row. */
+public class FieldMaxAgg extends FieldAggregator {
+ public FieldMaxAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ Object max;
+
+ if (accumulator == null || inputField == null) {
+ max = (accumulator == null ? inputField : accumulator);
+ } else {
+ LogicalTypeRoot type = fieldType.getTypeRoot();
+ if (RowDataUtils.compare(accumulator, inputField, type) < 0) {
+ max = inputField;
+ } else {
+ max = accumulator;
+ }
+ }
+ return max;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMinAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMinAgg.java
new file mode 100644
index 00000000..8f38112f
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMinAgg.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.store.utils.RowDataUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+/** min aggregate a field of a row. */
+public class FieldMinAgg extends FieldAggregator {
+ public FieldMinAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ Object min;
+
+ if (accumulator == null || inputField == null) {
+ min = (accumulator == null ? inputField : accumulator);
+ } else {
+ LogicalTypeRoot type = fieldType.getTypeRoot();
+ if (RowDataUtils.compare(accumulator, inputField, type) < 0) {
+ min = accumulator;
+ } else {
+ min = inputField;
+ }
+ }
+ return min;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldSumAgg.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldSumAgg.java
new file mode 100644
index 00000000..234eb5fe
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldSumAgg.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** sum aggregate a field of a row. */
+public class FieldSumAgg extends FieldAggregator {
+ public FieldSumAgg(LogicalType logicalType) {
+ super(logicalType);
+ }
+
+ @Override
+ Object agg(Object accumulator, Object inputField) {
+ Object sum;
+
+ if (accumulator == null || inputField == null) {
+ sum = (accumulator == null ? inputField : accumulator);
+ } else {
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case DECIMAL:
+ DecimalData mergeFieldDD = (DecimalData) accumulator;
+ DecimalData inFieldDD = (DecimalData) inputField;
+ assert mergeFieldDD.scale() == inFieldDD.scale()
+ : "Inconsistent scale of aggregate DecimalData!";
+ assert mergeFieldDD.precision() == inFieldDD.precision()
+ : "Inconsistent precision of aggregate
DecimalData!";
+ sum =
+ DecimalDataUtils.add(
+ mergeFieldDD,
+ inFieldDD,
+ mergeFieldDD.precision(),
+ mergeFieldDD.scale());
+ break;
+ case TINYINT:
+ sum = (byte) ((byte) accumulator + (byte) inputField);
+ break;
+ case SMALLINT:
+ sum = (short) ((short) accumulator + (short) inputField);
+ break;
+ case INTEGER:
+ sum = (int) accumulator + (int) inputField;
+ break;
+ case BIGINT:
+ sum = (long) accumulator + (long) inputField;
+ break;
+ case FLOAT:
+ sum = (float) accumulator + (float) inputField;
+ break;
+ case DOUBLE:
+ sum = (double) accumulator + (double) inputField;
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ return sum;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 8a983479..b72cc634 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.WriteMode;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
+import
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -72,20 +73,28 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
RowType rowType = tableSchema.logicalRowType();
Configuration conf = Configuration.fromMap(tableSchema.options());
CoreOptions.MergeEngine mergeEngine =
conf.get(CoreOptions.MERGE_ENGINE);
+ List<LogicalType> fieldTypes = rowType.getChildren();
+ RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[fieldTypes.size()];
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ fieldGetters[i] =
RowDataUtils.createNullCheckingFieldGetter(fieldTypes.get(i), i);
+ }
MergeFunction mergeFunction;
switch (mergeEngine) {
case DEDUPLICATE:
mergeFunction = new DeduplicateMergeFunction();
break;
case PARTIAL_UPDATE:
- List<LogicalType> fieldTypes = rowType.getChildren();
- RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[fieldTypes.size()];
- for (int i = 0; i < fieldTypes.size(); i++) {
- fieldGetters[i] =
-
RowDataUtils.createNullCheckingFieldGetter(fieldTypes.get(i), i);
- }
mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
break;
+ case AGGREGATE:
+ List<String> fieldNames = tableSchema.fieldNames();
+ List<String> primaryKeys = tableSchema.primaryKeys();
+ mergeFunction =
+ new AggregateMergeFunction(
+ fieldGetters,
+ new AggregateMergeFunction.RowAggregator(
+ conf, fieldNames, fieldTypes,
primaryKeys));
+ break;
default:
throw new UnsupportedOperationException("Unsupported merge
engine: " + mergeEngine);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregatorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregatorTest.java
new file mode 100644
index 00000000..34ac0d44
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** test whether {@link FieldAggregator}' subclasses behaviors are expected. */
+public class FieldAggregatorTest {
+
+ @Test
+ public void testFieldBoolAndAgg() {
+ FieldBoolAndAgg fieldBoolAndAgg = new FieldBoolAndAgg(new
BooleanType());
+ Boolean accumulator = false;
+ Boolean inputField = true;
+ assertThat(fieldBoolAndAgg.agg(accumulator,
inputField)).isEqualTo(false);
+
+ accumulator = true;
+ inputField = true;
+ assertThat(fieldBoolAndAgg.agg(accumulator,
inputField)).isEqualTo(true);
+ }
+
+ @Test
+ public void testFieldBoolOrAgg() {
+ FieldBoolOrAgg fieldBoolOrAgg = new FieldBoolOrAgg(new BooleanType());
+ Boolean accumulator = false;
+ Boolean inputField = true;
+ assertThat(fieldBoolOrAgg.agg(accumulator,
inputField)).isEqualTo(true);
+
+ accumulator = false;
+ inputField = false;
+ assertThat(fieldBoolOrAgg.agg(accumulator,
inputField)).isEqualTo(false);
+ }
+
+ @Test
+ public void testFieldLastNonNullValueAgg() {
+ FieldLastNonNullValueAgg fieldLastNonNullValueAgg =
+ new FieldLastNonNullValueAgg(new IntType());
+ Integer accumulator = null;
+ Integer inputField = 1;
+ assertThat(fieldLastNonNullValueAgg.agg(accumulator,
inputField)).isEqualTo(1);
+
+ accumulator = 1;
+ inputField = null;
+ assertThat(fieldLastNonNullValueAgg.agg(accumulator,
inputField)).isEqualTo(1);
+ }
+
+ @Test
+ public void testFieldLastValueAgg() {
+ FieldLastValueAgg fieldLastValueAgg = new FieldLastValueAgg(new
IntType());
+ Integer accumulator = null;
+ Integer inputField = 1;
+ assertThat(fieldLastValueAgg.agg(accumulator,
inputField)).isEqualTo(1);
+
+ accumulator = 1;
+ inputField = null;
+ assertThat(fieldLastValueAgg.agg(accumulator,
inputField)).isEqualTo(null);
+ }
+
+ @Test
+ public void testFieldListaggAgg() {
+ FieldListaggAgg fieldListaggAgg = new FieldListaggAgg(new
VarCharType());
+ StringData accumulator = BinaryStringData.fromString("user1");
+ StringData inputField = BinaryStringData.fromString("user2");
+ assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
+ .isEqualTo("user1,user2");
+ }
+
+ @Test
+ public void testFieldMaxAgg() {
+ FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());
+ Integer accumulator = 1;
+ Integer inputField = 10;
+ assertThat(fieldMaxAgg.agg(accumulator, inputField)).isEqualTo(10);
+ }
+
+ @Test
+ public void testFieldMinAgg() {
+ FieldMinAgg fieldMinAgg = new FieldMinAgg(new IntType());
+ Integer accumulator = 1;
+ Integer inputField = 10;
+ assertThat(fieldMinAgg.agg(accumulator, inputField)).isEqualTo(1);
+ }
+
+ @Test
+ public void testFieldSumAgg() {
+ FieldSumAgg fieldSumAgg = new FieldSumAgg(new IntType());
+ Integer accumulator = 1;
+ Integer inputField = 10;
+ assertThat(fieldSumAgg.agg(accumulator, inputField)).isEqualTo(11);
+ }
+}