This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 69367649 [FLINK-27626] Introduce pre-aggregated merge to table store
69367649 is described below

commit 69367649cba9af2d2628287d84ea48e254d315b0
Author: Hannankan <[email protected]>
AuthorDate: Mon Sep 19 17:17:39 2022 +0800

    [FLINK-27626] Introduce pre-aggregated merge to table store
    
    This closes #270
---
 docs/content/docs/development/create-table.md      |  42 ++
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../flink/table/store/utils/RowDataUtils.java      |  47 ++
 .../flink/table/store/utils/RowDataUtilsTest.java  |  30 +
 .../store/connector/source/FlinkSourceBuilder.java |  13 +-
 .../store/connector/PreAggregationITCase.java      | 771 +++++++++++++++++++++
 .../org/apache/flink/table/store/CoreOptions.java  |   4 +-
 .../compact/aggregate/AggregateMergeFunction.java  | 121 ++++
 .../compact/aggregate/FieldAggregator.java         |  75 ++
 .../compact/aggregate/FieldBoolAndAgg.java         |  46 ++
 .../compact/aggregate/FieldBoolOrAgg.java          |  46 ++
 .../aggregate/FieldLastNonNullValueAgg.java        |  34 +
 .../compact/aggregate/FieldLastValueAgg.java       |  34 +
 .../compact/aggregate/FieldListaggAgg.java         |  60 ++
 .../mergetree/compact/aggregate/FieldMaxAgg.java   |  47 ++
 .../mergetree/compact/aggregate/FieldMinAgg.java   |  47 ++
 .../mergetree/compact/aggregate/FieldSumAgg.java   |  78 +++
 .../table/ChangelogWithKeyFileStoreTable.java      |  21 +-
 .../compact/aggregate/FieldAggregatorTest.java     | 115 +++
 19 files changed, 1623 insertions(+), 10 deletions(-)

diff --git a/docs/content/docs/development/create-table.md 
b/docs/content/docs/development/create-table.md
index 4a43fa7d..1b582c0f 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -275,6 +275,48 @@ __Note:__
 - It is best not to have NULL values in the fields, NULL will not overwrite 
data.
 {{< /hint >}}
 
+## Pre-aggregate
+You can configure pre-aggregate for each value field from options:
+
+```sql
+CREATE TABLE MyTable (
+  product_id BIGINT,
+  price DOUBLE,
+  sales BIGINT,
+  PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+  'merge-engine' = 'aggregation',
+  'fields.price.aggregate-function'='max',
+  'fields.sales.aggregate-function'='sum'
+);
+```
+Each value field is aggregated with the latest data one by one
+under the same primary key according to the aggregate function.
+
+For example, the inputs:
+- <1, 23.0, 15>
+- <1, 30.2, 20>
+
+Output:
+- <1, 30.2, 35>
+
+### Supported Aggregate Functions and Related Data Types
+Supported aggregate functions include `sum`, `max/min`, `last_non_null_value`, 
`last_value`,  `listagg`, `bool_or/bool_and`. 
+These functions support different data types.
+
+- `sum` supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE 
data types.
+- `max/min` support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 
DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ data types.
+- `last_non_null_value/last_value` support all data types.
+- `listagg` supports STRING  data types.
+- `bool_and/bool_or` support BOOLEAN data type.
+
+{{< hint info >}}
+__Note:__
+- Pre-aggregate is only supported for table with primary key.
+- Pre-aggregate is not supported for streaming consuming.
+- Pre-aggregate currently only support INSERT changes. 
+{{< /hint >}}
+
 ## Append-only Table
 
 Append-only tables are a performance feature that only accepts `INSERT_ONLY` 
data to append to the storage instead of 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index b093a1a4..c01d1bc8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -144,7 +144,7 @@
             <td><h5>merge-engine</h5></td>
             <td style="word-wrap: break-word;">deduplicate</td>
             <td><p>Enum</p></td>
-            <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null fields.</li></ul></td>
+            <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null 
fields.</li><li>"aggregation": Aggregate fields with same primary 
key.</li></ul></td>
         </tr>
         <tr>
             <td><h5>num-levels</h5></td>
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
index d95824ce..a38c0ad4 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.utils;
 
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
@@ -27,6 +28,7 @@ import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RawValueData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryArrayData;
 import org.apache.flink.table.data.binary.BinaryMapData;
 import org.apache.flink.table.data.binary.BinaryRawValueData;
@@ -38,6 +40,7 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
@@ -295,4 +298,48 @@ public class RowDataUtils {
             };
         }
     }
+
+    public static int compare(Object x, Object y, LogicalTypeRoot type) {
+        int ret;
+        switch (type) {
+            case DECIMAL:
+                DecimalData xDD = (DecimalData) x;
+                DecimalData yDD = (DecimalData) y;
+                assert xDD.scale() == yDD.scale() : "Inconsistent scale of 
aggregate DecimalData!";
+                assert xDD.precision() == yDD.precision()
+                        : "Inconsistent precision of aggregate DecimalData!";
+                ret = DecimalDataUtils.compare(xDD, yDD);
+                break;
+            case TINYINT:
+                ret = Byte.compare((byte) x, (byte) y);
+                break;
+            case SMALLINT:
+                ret = Short.compare((short) x, (short) y);
+                break;
+            case INTEGER:
+            case DATE:
+                ret = Integer.compare((int) x, (int) y);
+                break;
+            case BIGINT:
+                ret = Long.compare((long) x, (long) y);
+                break;
+            case FLOAT:
+                ret = Float.compare((float) x, (float) y);
+                break;
+            case DOUBLE:
+                ret = Double.compare((double) x, (double) y);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                TimestampData xDD1 = (TimestampData) x;
+                TimestampData yDD1 = (TimestampData) y;
+                ret = xDD1.compareTo(yDD1);
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException();
+            default:
+                throw new IllegalArgumentException();
+        }
+        return ret;
+    }
 }
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
index 63e7670c..66194672 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/utils/RowDataUtilsTest.java
@@ -23,14 +23,20 @@ import 
org.apache.flink.connector.datagen.table.RandomGeneratorVisitor;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link RowDataUtils}. */
@@ -106,4 +112,28 @@ public class RowDataUtilsTest {
     private BinaryRowData toBinary(RowData row) {
         return serializer.toBinaryRow(row).copy();
     }
+
+    @Test
+    public void testCompare() {
+        // test DECIMAL data type
+        DecimalData xDecimalData = DecimalData.fromBigDecimal(new 
BigDecimal("12.34"), 4, 2);
+        DecimalData yDecimalData = DecimalData.fromBigDecimal(new 
BigDecimal("13.14"), 4, 2);
+        assertThat(RowDataUtils.compare(xDecimalData, yDecimalData, 
LogicalTypeRoot.DECIMAL))
+                .isLessThan(0);
+
+        // test DOUBLE data type
+        double xDouble = 13.14;
+        double yDouble = 12.13;
+        assertThat(RowDataUtils.compare(xDouble, yDouble, 
LogicalTypeRoot.DOUBLE)).isGreaterThan(0);
+
+        // test TIMESTAMP_WITHOUT_TIME_ZONE data type
+        TimestampData xTimestampData = 
TimestampData.fromLocalDateTime(LocalDateTime.now());
+        TimestampData yTimestampData = 
TimestampData.fromTimestamp(xTimestampData.toTimestamp());
+        assertThat(
+                        RowDataUtils.compare(
+                                xTimestampData,
+                                yTimestampData,
+                                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE))
+                .isEqualTo(0);
+    }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index c3f565a6..e975796c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
 import java.util.Optional;
 
 import static 
org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
@@ -128,10 +129,18 @@ public class FlinkSourceBuilder {
     private Source<RowData, ?, ?> buildSource() {
         if (isContinuous) {
             // TODO move validation to a dedicated method
+            MergeEngine mergeEngine = conf.get(MERGE_ENGINE);
+            HashMap<MergeEngine, String> mergeEngineDesc =
+                    new HashMap<MergeEngine, String>() {
+                        {
+                            put(MergeEngine.PARTIAL_UPDATE, "Partial update");
+                            put(MergeEngine.AGGREGATE, "Pre-aggregate");
+                        }
+                    };
             if (table.schema().primaryKeys().size() > 0
-                    && conf.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) {
+                    && mergeEngineDesc.containsKey(mergeEngine)) {
                 throw new ValidationException(
-                        "Partial update continuous reading is not supported.");
+                        mergeEngineDesc.get(mergeEngine) + " continuous 
reading is not supported.");
             }
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
new file mode 100644
index 00000000..c0cfa506
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for pre-aggregation. */
+public class PreAggregationITCase {
+    /** ITCase for bool_or/bool_and aggregate function. */
+    public static class BoolOrAndAggregation extends FileStoreTableITCase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE IF NOT EXISTS T7 ("
+                            + "j INT, k INT, "
+                            + "a BOOLEAN, "
+                            + "b BOOLEAN,"
+                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + "'fields.a.aggregate-function'='bool_or',"
+                            + "'fields.b.aggregate-function'='bool_and'"
+                            + ");");
+        }
+
+        @Test
+        public void testMergeInMemory() {
+            batchSql(
+                    "INSERT INTO T7 VALUES "
+                            + "(1, 2, CAST('TRUE' AS  BOOLEAN), CAST('TRUE' AS 
BOOLEAN)),"
+                            + "(1, 2, CAST(NULL AS BOOLEAN), CAST(NULL AS 
BOOLEAN)), "
+                            + "(1, 2, CAST('FALSE' AS BOOLEAN), CAST('FALSE' 
AS BOOLEAN))");
+            List<Row> result = batchSql("SELECT * FROM T7");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, true, 
false));
+        }
+
+        @Test
+        public void testMergeRead() {
+            batchSql(
+                    "INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), 
CAST('TRUE' AS BOOLEAN))");
+            batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), 
CAST(NULL AS BOOLEAN))");
+            batchSql(
+                    "INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), 
CAST('FALSE' AS BOOLEAN))");
+
+            List<Row> result = batchSql("SELECT * FROM T7");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, true, 
false));
+        }
+
+        @Test
+        public void testMergeCompaction() {
+            // Wait compaction
+            batchSql("ALTER TABLE T7 SET ('commit.force-compact'='true')");
+
+            // key 1 2
+            batchSql(
+                    "INSERT INTO T7 VALUES (1, 2, CAST('TRUE' AS  BOOLEAN), 
CAST('TRUE' AS BOOLEAN))");
+            batchSql("INSERT INTO T7 VALUES (1, 2, CAST(NULL AS BOOLEAN), 
CAST(NULL AS BOOLEAN))");
+            batchSql(
+                    "INSERT INTO T7 VALUES (1, 2, CAST('FALSE' AS BOOLEAN), 
CAST('FALSE' AS BOOLEAN))");
+
+            // key 1 3
+            batchSql(
+                    "INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS  BOOLEAN), 
CAST('TRUE' AS BOOLEAN))");
+            batchSql("INSERT INTO T7 VALUES (1, 3, CAST(NULL AS BOOLEAN), 
CAST(NULL AS BOOLEAN))");
+            batchSql(
+                    "INSERT INTO T7 VALUES (1, 3, CAST('FALSE' AS BOOLEAN), 
CAST('TRUE' AS BOOLEAN))");
+
+            assertThat(batchSql("SELECT * FROM T7"))
+                    .containsExactlyInAnyOrder(
+                            Row.of(1, 2, true, false), Row.of(1, 3, false, 
true));
+        }
+
+        @Test
+        public void testStreamingRead() {
+            assertThatThrownBy(
+                    () -> sEnv.from("T7").execute().print(),
+                    "Pre-aggregate continuous reading is not supported");
+        }
+    }
+
+    /** ITCase for listagg aggregate function. */
+    public static class ListAggAggregation extends FileStoreTableITCase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE IF NOT EXISTS T6 ("
+                            + "j INT, k INT, "
+                            + "a STRING, "
+                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + "'fields.a.aggregate-function'='listagg'"
+                            + ");");
+        }
+
+        @Test
+        public void testMergeInMemory() {
+            batchSql(
+                    "INSERT INTO T6 VALUES "
+                            + "(1, 2, 'first line'),"
+                            + "(1, 2, CAST(NULL AS STRING)), "
+                            + "(1, 2, 'second line')");
+            List<Row> result = batchSql("SELECT * FROM T6");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, "first 
line,second line"));
+        }
+
+        @Test
+        public void testMergeRead() {
+            batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')");
+            batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))");
+            batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')");
+
+            List<Row> result = batchSql("SELECT * FROM T6");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, "first 
line,second line"));
+        }
+
+        @Test
+        public void testMergeCompaction() {
+            // Wait compaction
+            batchSql("ALTER TABLE T6 SET ('commit.force-compact'='true')");
+
+            // key 1 2
+            batchSql("INSERT INTO T6 VALUES (1, 2, 'first line')");
+            batchSql("INSERT INTO T6 VALUES (1, 2, CAST(NULL AS STRING))");
+            batchSql("INSERT INTO T6 VALUES (1, 2, 'second line')");
+
+            // key 1 3
+            batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))");
+            batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))");
+            batchSql("INSERT INTO T6 VALUES (1, 3, CAST(NULL AS STRING))");
+
+            assertThat(batchSql("SELECT * FROM T6"))
+                    .containsExactlyInAnyOrder(
+                            Row.of(1, 2, "first line,second line"), Row.of(1, 
3, null));
+        }
+
+        @Test
+        public void testStreamingRead() {
+            assertThatThrownBy(
+                    () -> sEnv.from("T6").execute().print(),
+                    "Pre-aggregate continuous reading is not supported");
+        }
+    }
+
+    /** ITCase for last value aggregate function. */
+    public static class LastValueAggregation extends FileStoreTableITCase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE IF NOT EXISTS T5 ("
+                            + "j INT, k INT, "
+                            + "a INT, "
+                            + "i DATE,"
+                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + "'fields.a.aggregate-function'='last_value', "
+                            + "'fields.i.aggregate-function'='last_value'"
+                            + ");");
+        }
+
+        @Test
+        public void testMergeInMemory() {
+            batchSql(
+                    "INSERT INTO T5 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS 
DATE)),"
+                            + "(1, 2, 2, CAST('2020-01-02' AS DATE)), "
+                            + "(1, 2, 3, CAST(NULL AS DATE))");
+            List<Row> result = batchSql("SELECT * FROM T5");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, 
null));
+        }
+
+        @Test
+        public void testMergeRead() {
+            batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), 
CAST('2020-01-01' AS DATE))");
+            batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS 
DATE))");
+            batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+            List<Row> result = batchSql("SELECT * FROM T5");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, 
null));
+        }
+
+        @Test
+        public void testMergeCompaction() {
+            // Wait compaction
+            batchSql("ALTER TABLE T5 SET ('commit.force-compact'='true')");
+
+            // key 1 2
+            batchSql("INSERT INTO T5 VALUES (1, 2, CAST(NULL AS INT), 
CAST('2020-01-01' AS DATE))");
+            batchSql("INSERT INTO T5 VALUES (1, 2, 2, CAST('2020-01-02' AS 
DATE))");
+            batchSql("INSERT INTO T5 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+            // key 1 3
+            batchSql("INSERT INTO T5 VALUES (1, 3, 3, CAST('2020-01-01' AS 
DATE))");
+            batchSql("INSERT INTO T5 VALUES (1, 3, 2, CAST(NULL AS DATE))");
+            batchSql("INSERT INTO T5 VALUES (1, 3, CAST(NULL AS INT), 
CAST('2022-01-02' AS DATE))");
+
+            assertThat(batchSql("SELECT * FROM T5"))
+                    .containsExactlyInAnyOrder(
+                            Row.of(1, 2, 3, null), Row.of(1, 3, null, 
LocalDate.of(2022, 1, 2)));
+        }
+
+        @Test
+        public void testStreamingRead() {
+            assertThatThrownBy(
+                    () -> sEnv.from("T5").execute().print(),
+                    "Pre-aggregate continuous reading is not supported");
+        }
+    }
+
+    /** ITCase for last non-null value aggregate function. */
+    public static class LastNonNullValueAggregation extends 
FileStoreTableITCase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE IF NOT EXISTS T4 ("
+                            + "j INT, k INT, "
+                            + "a INT, "
+                            + "i DATE,"
+                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + 
"'fields.a.aggregate-function'='last_non_null_value', "
+                            + 
"'fields.i.aggregate-function'='last_non_null_value'"
+                            + ");");
+        }
+
+        @Test
+        public void testMergeInMemory() {
+            batchSql(
+                    "INSERT INTO T4 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), CAST('2020-01-01' AS 
DATE)),"
+                            + "(1, 2, 2, CAST('2020-01-02' AS DATE)), "
+                            + "(1, 2, 3, CAST(NULL AS DATE))");
+            List<Row> result = batchSql("SELECT * FROM T4");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, 
LocalDate.of(2020, 1, 2)));
+        }
+
+        @Test
+        public void testMergeRead() {
+            batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 
CAST('2020-01-01' AS DATE))");
+            batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST('2020-01-02' AS 
DATE))");
+            batchSql("INSERT INTO T4 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+            List<Row> result = batchSql("SELECT * FROM T4");
+            assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, 
LocalDate.of(2020, 1, 2)));
+        }
+
+        @Test
+        public void testMergeCompaction() {
+            // Wait compaction
+            batchSql("ALTER TABLE T4 SET ('commit.force-compact'='true')");
+
+            // key 1 2
+            batchSql("INSERT INTO T4 VALUES (1, 2, CAST(NULL AS INT), 
CAST('2020-01-01' AS DATE))");
+            batchSql("INSERT INTO T4 VALUES (1, 2, 2, CAST('2020-01-02' AS 
DATE))");
+            batchSql("INSERT INTO T4 VALUES (1, 2, 3, CAST(NULL AS DATE))");
+
+            // key 1 3
+            batchSql("INSERT INTO T4 VALUES (1, 3, 3, CAST('2020-01-01' AS 
DATE))");
+            batchSql("INSERT INTO T4 VALUES (1, 3, 2, CAST(NULL AS DATE))");
+            batchSql("INSERT INTO T4 VALUES (1, 3, CAST(NULL AS INT), 
CAST('2022-01-02' AS DATE))");
+
+            assertThat(batchSql("SELECT * FROM T4"))
+                    .containsExactlyInAnyOrder(
+                            Row.of(1, 2, 3, LocalDate.of(2020, 1, 2)),
+                            Row.of(1, 3, 2, LocalDate.of(2022, 1, 2)));
+        }
+
+        @Test
+        public void testStreamingRead() {
+            assertThatThrownBy(
+                    () -> sEnv.from("T4").execute().print(),
+                    "Pre-aggregate continuous reading is not supported");
+        }
+    }
+
+    /** ITCase for min aggregate function. */
+    public static class MinAggregation extends FileStoreTableITCase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE IF NOT EXISTS T3 ("
+                            + "j INT, k INT, "
+                            + "a INT, "
+                            + "b Decimal(4,2), "
+                            + "c TINYINT,"
+                            + "d SMALLINT,"
+                            + "e BIGINT,"
+                            + "f FLOAT,"
+                            + "h DOUBLE,"
+                            + "i DATE,"
+                            + "l TIMESTAMP,"
+                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + "'fields.a.aggregate-function'='min', "
+                            + "'fields.b.aggregate-function'='min', "
+                            + "'fields.c.aggregate-function'='min', "
+                            + "'fields.d.aggregate-function'='min', "
+                            + "'fields.e.aggregate-function'='min', "
+                            + "'fields.f.aggregate-function'='min',"
+                            + "'fields.h.aggregate-function'='min',"
+                            + "'fields.i.aggregate-function'='min',"
+                            + "'fields.l.aggregate-function'='min'"
+                            + ");");
+        }
+
+        @Test
+        public void testMergeInMemory() {
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), 1.01, CAST(-1 AS 
TINYINT), CAST(-1 AS SMALLINT), "
+                            + "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS 
DOUBLE), CAST('2020-01-01' AS DATE), "
+                            + "CAST('0001-01-01 01:01:01' AS TIMESTAMP)),"
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), "
+                            + "CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS 
DOUBLE), CAST('2020-01-02' AS DATE), "
+                            + "CAST('0002-01-01 01:01:01' AS TIMESTAMP)), "
+                            + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), "
+                            + "CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS 
DOUBLE), CAST('2022-01-02' AS DATE), "
+                            + "CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+            List<Row> result = batchSql("SELECT * FROM T3");
+            assertThat(result)
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    2,
+                                    new BigDecimal("1.01"),
+                                    (byte) -1,
+                                    (short) -1,
+                                    (long) 1000,
+                                    (float) -1.11,
+                                    -1.11,
+                                    LocalDate.of(2020, 1, 1),
+                                    LocalDateTime.of(1, 1, 1, 1, 1, 1)));
+        }
+
+        @Test
+        public void testMergeRead() {
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' 
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), "
+                            + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' 
AS DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), "
+                            + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS 
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+            List<Row> result = batchSql("SELECT * FROM T3");
+            assertThat(result)
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    2,
+                                    new BigDecimal("1.01"),
+                                    (byte) -1,
+                                    (short) -1,
+                                    (long) 1000,
+                                    (float) -1.11,
+                                    -1.11,
+                                    LocalDate.of(2020, 1, 1),
+                                    LocalDateTime.of(1, 1, 1, 1, 1, 1)));
+        }
+
+        @Test
+        public void testMergeCompaction() {
+            // Wait compaction
+            batchSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+            // key 1 2
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' 
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), "
+                            + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' 
AS DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), "
+                            + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS 
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+            // key 1 3
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' 
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), "
+                            + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' 
AS DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T3 VALUES "
+                            + "(1, 3, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), "
+                            + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS 
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+            assertThat(batchSql("SELECT * FROM T3"))
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    2,
+                                    new BigDecimal("1.01"),
+                                    (byte) -1,
+                                    (short) -1,
+                                    (long) 1000,
+                                    (float) -1.11,
+                                    -1.11,
+                                    LocalDate.of(2020, 1, 1),
+                                    LocalDateTime.of(1, 1, 1, 1, 1, 1)),
+                            Row.of(
+                                    1,
+                                    3,
+                                    3,
+                                    new BigDecimal("1.01"),
+                                    (byte) -1,
+                                    (short) -1,
+                                    (long) 1000,
+                                    (float) -1.11,
+                                    -1.11,
+                                    LocalDate.of(2020, 1, 1),
+                                    LocalDateTime.of(1, 1, 1, 1, 1, 1)));
+        }
+
+        @Test
+        public void testStreamingRead() {
+            assertThatThrownBy(
+                    () -> sEnv.from("T3").execute().print(),
+                    "Pre-aggregate continuous reading is not supported");
+        }
+    }
+
+    /** ITCase for max aggregate function. */
+    public static class MaxAggregation extends FileStoreTableITCase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE IF NOT EXISTS T2 ("
+                            + "j INT, k INT, "
+                            + "a INT, "
+                            + "b Decimal(4,2), "
+                            + "c TINYINT,"
+                            + "d SMALLINT,"
+                            + "e BIGINT,"
+                            + "f FLOAT,"
+                            + "h DOUBLE,"
+                            + "i DATE,"
+                            + "l TIMESTAMP,"
+                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + "'fields.a.aggregate-function'='max', "
+                            + "'fields.b.aggregate-function'='max', "
+                            + "'fields.c.aggregate-function'='max', "
+                            + "'fields.d.aggregate-function'='max', "
+                            + "'fields.e.aggregate-function'='max', "
+                            + "'fields.f.aggregate-function'='max',"
+                            + "'fields.h.aggregate-function'='max',"
+                            + "'fields.i.aggregate-function'='max',"
+                            + "'fields.l.aggregate-function'='max'"
+                            + ");");
+        }
+
+        @Test
+        public void testMergeInMemory() {
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), "
+                            + "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS 
DOUBLE), CAST('2020-01-01' AS DATE), "
+                            + "CAST('0001-01-01 01:01:01' AS TIMESTAMP)),"
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), "
+                            + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' 
AS DATE), "
+                            + "CAST('0002-01-01 01:01:01' AS TIMESTAMP)), "
+                            + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), "
+                            + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS 
DATE), "
+                            + "CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+            List<Row> result = batchSql("SELECT * FROM T2");
+            assertThat(result)
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    3,
+                                    new BigDecimal("10.00"),
+                                    (byte) 2,
+                                    (short) 2,
+                                    (long) 10000000,
+                                    (float) 1.11,
+                                    1.21,
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDateTime.of(2, 1, 1, 2, 0, 0)));
+        }
+
+        @Test
+        public void testMergeRead() {
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' 
AS DATE), "
+                            + "CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+                            + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS 
DATE), "
+                            + "CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), 0, "
+                            + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS 
DATE), "
+                            + "CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+            List<Row> result = batchSql("SELECT * FROM T2");
+            assertThat(result)
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    3,
+                                    new BigDecimal("10.00"),
+                                    (byte) 2,
+                                    (short) 2,
+                                    (long) 10000000,
+                                    (float) 1.11,
+                                    1.21,
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDateTime.of(2, 1, 1, 2, 0, 0)));
+        }
+
+        @Test
+        public void testMergeCompaction() {
+            // Wait compaction
+            batchSql("ALTER TABLE T2 SET ('commit.force-compact'='true')");
+
+            // key 1 2
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' 
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+                            + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS 
DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), 0, "
+                            + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS 
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+            // key 1 3
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' 
AS DATE), CAST('0001-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+                            + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS 
DATE), CAST('0002-01-01 01:01:01' AS TIMESTAMP))");
+            batchSql(
+                    "INSERT INTO T2 VALUES "
+                            + "(1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), 0, "
+                            + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS 
DATE), CAST('0002-01-01 02:00:00' AS TIMESTAMP))");
+
+            assertThat(batchSql("SELECT * FROM T2"))
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    3,
+                                    new BigDecimal("10.00"),
+                                    (byte) 2,
+                                    (short) 2,
+                                    (long) 10000000,
+                                    (float) 1.11,
+                                    1.21,
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDateTime.of(2, 1, 1, 2, 0, 0)),
+                            Row.of(
+                                    1,
+                                    3,
+                                    6,
+                                    new BigDecimal("10.00"),
+                                    (byte) 2,
+                                    (short) 2,
+                                    (long) 10000000,
+                                    (float) 1.11,
+                                    1.21,
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDateTime.of(2, 1, 1, 2, 0, 0)));
+        }
+
+        @Test
+        public void testStreamingRead() {
+            assertThatThrownBy(
+                    () -> sEnv.from("T2").execute().print(),
+                    "Pre-aggregate continuous reading is not supported");
+        }
+    }
+
+    /** ITCase for sum aggregate function. */
+    public static class SumAggregation extends FileStoreTableITCase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE IF NOT EXISTS T1 ("
+                            + "j INT, k INT, "
+                            + "a INT, "
+                            + "b Decimal(4,2), "
+                            + "c TINYINT,"
+                            + "d SMALLINT,"
+                            + "e BIGINT,"
+                            + "f FLOAT,"
+                            + "h DOUBLE,"
+                            + "PRIMARY KEY (j,k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + "'fields.a.aggregate-function'='sum', "
+                            + "'fields.b.aggregate-function'='sum', "
+                            + "'fields.c.aggregate-function'='sum', "
+                            + "'fields.d.aggregate-function'='sum', "
+                            + "'fields.e.aggregate-function'='sum', "
+                            + "'fields.f.aggregate-function'='sum',"
+                            + "'fields.h.aggregate-function'='sum'"
+                            + ");");
+        }
+
+        @Test
+        public void testMergeInMemory() {
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS 
TINYINT), CAST(-1 AS SMALLINT), "
+                            + "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS 
DOUBLE)),"
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), "
+                            + "CAST(100000 AS BIGINT), -1.11, CAST(1.11 AS 
DOUBLE)), "
+                            + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), "
+                            + "CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS 
DOUBLE))");
+            List<Row> result = batchSql("SELECT * FROM T1");
+            assertThat(result)
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    5,
+                                    new BigDecimal("12.11"),
+                                    (byte) 4,
+                                    (short) 2,
+                                    (long) 10101000,
+                                    (float) 0,
+                                    1.11));
+        }
+
+        @Test
+        public void testMergeRead() {
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS 
SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE))");
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), "
+                            + "CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))");
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), "
+                            + "-1.11, CAST(-1.11 AS DOUBLE))");
+
+            List<Row> result = batchSql("SELECT * FROM T1");
+            assertThat(result)
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    6,
+                                    new BigDecimal("12.11"),
+                                    (byte) 4,
+                                    (short) 2,
+                                    (long) 10101000,
+                                    (float) 0,
+                                    1.11));
+        }
+
+        @Test
+        public void testMergeCompaction() {
+            // Wait compaction
+            batchSql("ALTER TABLE T1 SET ('commit.force-compact'='true')");
+
+            // key 1 2
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 2, 1, 1.01, CAST(1 AS TINYINT), CAST(-1 AS 
SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE))");
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), "
+                            + "CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))");
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), "
+                            + "-1.11, CAST(-1.11 AS DOUBLE))");
+
+            // key 1 3
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 3, 2, 1.01, CAST(1 AS TINYINT), CAST(-1 AS 
SMALLINT), CAST(1000 AS BIGINT), "
+                            + "1.11, CAST(1.11 AS DOUBLE))");
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 3, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS 
SMALLINT), CAST(100000 AS BIGINT), "
+                            + "CAST(NULL AS FLOAT), CAST(1.11 AS DOUBLE))");
+            batchSql(
+                    "INSERT INTO T1 VALUES "
+                            + "(1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS 
SMALLINT), CAST(10000000 AS BIGINT), "
+                            + "-1.11, CAST(-1.11 AS DOUBLE))");
+
+            assertThat(batchSql("SELECT * FROM T1"))
+                    .containsExactlyInAnyOrder(
+                            Row.of(
+                                    1,
+                                    2,
+                                    6,
+                                    new BigDecimal("12.11"),
+                                    (byte) 4,
+                                    (short) 2,
+                                    (long) 10101000,
+                                    (float) 0,
+                                    1.11),
+                            Row.of(
+                                    1,
+                                    3,
+                                    7,
+                                    new BigDecimal("12.11"),
+                                    (byte) 4,
+                                    (short) 2,
+                                    (long) 10101000,
+                                    (float) 0,
+                                    1.11));
+        }
+
+        @Test
+        public void testStreamingRead() {
+            assertThatThrownBy(
+                    () -> sEnv.from("T1").execute().print(),
+                    "Pre-aggregate continuous reading is not supported");
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index ce10ad5f..31b6b1f2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -482,7 +482,9 @@ public class CoreOptions implements Serializable {
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
 
-        PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
+        PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
+
+        AGGREGATE("aggregation", "Aggregate fields with same primary key.");
 
         private final String value;
         private final String description;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
new file mode 100644
index 00000000..3fbb4bbd
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the 
partial record,
+ * pre-aggregate non-null fields on merge.
+ */
+public class AggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowAggregator rowAggregator;
+
+    private transient GenericRowData row;
+
+    public AggregateMergeFunction(RowData.FieldGetter[] getters, RowAggregator 
rowAggregator) {
+        this.getters = getters;
+        this.rowAggregator = rowAggregator;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(KeyValue kv) {
+        checkArgument(
+                kv.valueKind() == RowKind.INSERT || kv.valueKind() == 
RowKind.UPDATE_AFTER,
+                "Pre-aggregate can not accept delete records!");
+        for (int i = 0; i < getters.length; i++) {
+            FieldAggregator fieldAggregator = 
rowAggregator.getFieldAggregatorAtPos(i);
+            Object accumulator = getters[i].getFieldOrNull(row);
+            Object inputField = getters[i].getFieldOrNull(kv.value());
+            Object mergedField = fieldAggregator.agg(accumulator, inputField);
+            row.setField(i, mergedField);
+        }
+    }
+
+    @Nullable
+    @Override
+    public RowData getValue() {
+        return row;
+    }
+
+    @Override
+    public MergeFunction copy() {
+        // RowData.FieldGetter is thread safe
+        return new AggregateMergeFunction(getters, rowAggregator);
+    }
+
+    /** Provide an Aggregator for merge a new row data. */
+    public static class RowAggregator implements Serializable {
+        public static final String FIELDS = "fields";
+        public static final String AGG_FUNCTION = "aggregate-function";
+
+        private final FieldAggregator[] fieldAggregators;
+
+        public RowAggregator(
+                Configuration sqlConf,
+                List<String> fieldNames,
+                List<LogicalType> fieldTypes,
+                List<String> primaryKeys) {
+            fieldAggregators = new FieldAggregator[fieldNames.size()];
+            for (int i = 0; i < fieldNames.size(); i++) {
+                String fieldName = fieldNames.get(i);
+                LogicalType fieldType = fieldTypes.get(i);
+                // aggregate by primary keys, so they do not aggregate
+                boolean isPrimaryKey = primaryKeys.contains(fieldName);
+                String strAggFunc =
+                        sqlConf.getString(
+                                key(FIELDS + "." + fieldName + "." + 
AGG_FUNCTION)
+                                        .stringType()
+                                        .noDefaultValue()
+                                        .withDescription(
+                                                "Get " + fieldName + "'s 
aggregate function"));
+                fieldAggregators[i] =
+                        FieldAggregator.createFieldAggregator(fieldType, 
strAggFunc, isPrimaryKey);
+            }
+        }
+
+        public FieldAggregator getFieldAggregatorAtPos(int fieldPos) {
+            return fieldAggregators[fieldPos];
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
new file mode 100644
index 00000000..a9780eba
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+
+/** abstract class of aggregating a field of a row. */
+public abstract class FieldAggregator implements Serializable {
+    protected LogicalType fieldType;
+
+    public FieldAggregator(LogicalType logicalType) {
+        this.fieldType = logicalType;
+    }
+
+    static FieldAggregator createFieldAggregator(
+            LogicalType fieldType, String strAgg, boolean isPrimaryKey) {
+        final FieldAggregator fieldAggregator;
+        if (isPrimaryKey) {
+            fieldAggregator = new FieldLastValueAgg(fieldType);
+        } else {
+            // ordered by type root definition
+            switch (strAgg) {
+                case "sum":
+                    fieldAggregator = new FieldSumAgg(fieldType);
+                    break;
+                case "max":
+                    fieldAggregator = new FieldMaxAgg(fieldType);
+                    break;
+                case "min":
+                    fieldAggregator = new FieldMinAgg(fieldType);
+                    break;
+                case "last_non_null_value":
+                    fieldAggregator = new FieldLastNonNullValueAgg(fieldType);
+                    break;
+                case "last_value":
+                    fieldAggregator = new FieldLastValueAgg(fieldType);
+                    break;
+                case "listagg":
+                    fieldAggregator = new FieldListaggAgg(fieldType);
+                    break;
+                case "bool_or":
+                    fieldAggregator = new FieldBoolOrAgg(fieldType);
+                    break;
+                case "bool_and":
+                    fieldAggregator = new FieldBoolAndAgg(fieldType);
+                    break;
+                default:
+                    throw new ValidationException(
+                            "Use unsupported aggregation or spell aggregate 
function incorrectly!");
+            }
+        }
+        return fieldAggregator;
+    }
+
+    abstract Object agg(Object accumulator, Object inputField);
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolAndAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolAndAgg.java
new file mode 100644
index 00000000..07dcd0bd
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolAndAgg.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** bool_and aggregate a field of a row. */
+public class FieldBoolAndAgg extends FieldAggregator {
+    public FieldBoolAndAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        Object boolAnd;
+
+        if (accumulator == null || inputField == null) {
+            boolAnd = (inputField == null) ? accumulator : inputField;
+        } else {
+            switch (fieldType.getTypeRoot()) {
+                case BOOLEAN:
+                    boolAnd = (boolean) accumulator && (boolean) inputField;
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+        return boolAnd;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolOrAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolOrAgg.java
new file mode 100644
index 00000000..73163f0b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldBoolOrAgg.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** bool_or aggregate a field of a row. */
+public class FieldBoolOrAgg extends FieldAggregator {
+    public FieldBoolOrAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        Object boolOr;
+
+        if (accumulator == null || inputField == null) {
+            boolOr = (inputField == null) ? accumulator : inputField;
+        } else {
+            switch (fieldType.getTypeRoot()) {
+                case BOOLEAN:
+                    boolOr = (boolean) accumulator || (boolean) inputField;
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+        return boolOr;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
new file mode 100644
index 00000000..9049808a
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** last non-null value aggregate a field of a row. */
+public class FieldLastNonNullValueAgg extends FieldAggregator {
+
+    public FieldLastNonNullValueAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        return (inputField == null) ? accumulator : inputField;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastValueAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastValueAgg.java
new file mode 100644
index 00000000..266b5a2b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldLastValueAgg.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** last value aggregate a field of a row. */
+public class FieldLastValueAgg extends FieldAggregator {
+
+    public FieldLastValueAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        return inputField;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java
new file mode 100644
index 00000000..69b10b5f
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** listagg aggregate a field of a row. */
+public class FieldListaggAgg extends FieldAggregator {
+    // TODO: make it configurable by with clause
+    public static final String DELIMITER = ",";
+
+    public FieldListaggAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        Object concatenate;
+
+        if (inputField == null || accumulator == null) {
+            concatenate = (inputField == null) ? accumulator : inputField;
+        } else {
+            // ordered by type root definition
+            switch (fieldType.getTypeRoot()) {
+                case VARCHAR:
+                    // TODO: ensure not VARCHAR(n)
+                    StringData mergeFieldSD = (StringData) accumulator;
+                    StringData inFieldSD = (StringData) inputField;
+                    concatenate =
+                            BinaryStringDataUtil.concat(
+                                    (BinaryStringData) mergeFieldSD,
+                                    new BinaryStringData(DELIMITER),
+                                    (BinaryStringData) inFieldSD);
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+        return concatenate;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMaxAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMaxAgg.java
new file mode 100644
index 00000000..cffbc5a6
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMaxAgg.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.store.utils.RowDataUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+/** max aggregate a field of a row. */
+public class FieldMaxAgg extends FieldAggregator {
+    public FieldMaxAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        Object max;
+
+        if (accumulator == null || inputField == null) {
+            max = (accumulator == null ? inputField : accumulator);
+        } else {
+            LogicalTypeRoot type = fieldType.getTypeRoot();
+            if (RowDataUtils.compare(accumulator, inputField, type) < 0) {
+                max = inputField;
+            } else {
+                max = accumulator;
+            }
+        }
+        return max;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMinAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMinAgg.java
new file mode 100644
index 00000000..8f38112f
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMinAgg.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.store.utils.RowDataUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+/** min aggregate a field of a row. */
+public class FieldMinAgg extends FieldAggregator {
+    public FieldMinAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        Object min;
+
+        if (accumulator == null || inputField == null) {
+            min = (accumulator == null ? inputField : accumulator);
+        } else {
+            LogicalTypeRoot type = fieldType.getTypeRoot();
+            if (RowDataUtils.compare(accumulator, inputField, type) < 0) {
+                min = accumulator;
+            } else {
+                min = inputField;
+            }
+        }
+        return min;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldSumAgg.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldSumAgg.java
new file mode 100644
index 00000000..234eb5fe
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldSumAgg.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** sum aggregate a field of a row. */
+public class FieldSumAgg extends FieldAggregator {
+    public FieldSumAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    Object agg(Object accumulator, Object inputField) {
+        Object sum;
+
+        if (accumulator == null || inputField == null) {
+            sum = (accumulator == null ? inputField : accumulator);
+        } else {
+            // ordered by type root definition
+            switch (fieldType.getTypeRoot()) {
+                case DECIMAL:
+                    DecimalData mergeFieldDD = (DecimalData) accumulator;
+                    DecimalData inFieldDD = (DecimalData) inputField;
+                    assert mergeFieldDD.scale() == inFieldDD.scale()
+                            : "Inconsistent scale of aggregate DecimalData!";
+                    assert mergeFieldDD.precision() == inFieldDD.precision()
+                            : "Inconsistent precision of aggregate 
DecimalData!";
+                    sum =
+                            DecimalDataUtils.add(
+                                    mergeFieldDD,
+                                    inFieldDD,
+                                    mergeFieldDD.precision(),
+                                    mergeFieldDD.scale());
+                    break;
+                case TINYINT:
+                    sum = (byte) ((byte) accumulator + (byte) inputField);
+                    break;
+                case SMALLINT:
+                    sum = (short) ((short) accumulator + (short) inputField);
+                    break;
+                case INTEGER:
+                    sum = (int) accumulator + (int) inputField;
+                    break;
+                case BIGINT:
+                    sum = (long) accumulator + (long) inputField;
+                    break;
+                case FLOAT:
+                    sum = (float) accumulator + (float) inputField;
+                    break;
+                case DOUBLE:
+                    sum = (double) accumulator + (double) inputField;
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+        return sum;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 8a983479..b72cc634 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.WriteMode;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
+import 
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -72,20 +73,28 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
         RowType rowType = tableSchema.logicalRowType();
         Configuration conf = Configuration.fromMap(tableSchema.options());
         CoreOptions.MergeEngine mergeEngine = 
conf.get(CoreOptions.MERGE_ENGINE);
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            fieldGetters[i] = 
RowDataUtils.createNullCheckingFieldGetter(fieldTypes.get(i), i);
+        }
         MergeFunction mergeFunction;
         switch (mergeEngine) {
             case DEDUPLICATE:
                 mergeFunction = new DeduplicateMergeFunction();
                 break;
             case PARTIAL_UPDATE:
-                List<LogicalType> fieldTypes = rowType.getChildren();
-                RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[fieldTypes.size()];
-                for (int i = 0; i < fieldTypes.size(); i++) {
-                    fieldGetters[i] =
-                            
RowDataUtils.createNullCheckingFieldGetter(fieldTypes.get(i), i);
-                }
                 mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
                 break;
+            case AGGREGATE:
+                List<String> fieldNames = tableSchema.fieldNames();
+                List<String> primaryKeys = tableSchema.primaryKeys();
+                mergeFunction =
+                        new AggregateMergeFunction(
+                                fieldGetters,
+                                new AggregateMergeFunction.RowAggregator(
+                                        conf, fieldNames, fieldTypes, 
primaryKeys));
+                break;
             default:
                 throw new UnsupportedOperationException("Unsupported merge 
engine: " + mergeEngine);
         }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregatorTest.java
new file mode 100644
index 00000000..34ac0d44
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** test whether {@link FieldAggregator}' subclasses behaviors are expected. */
+public class FieldAggregatorTest {
+
+    @Test
+    public void testFieldBoolAndAgg() {
+        FieldBoolAndAgg fieldBoolAndAgg = new FieldBoolAndAgg(new 
BooleanType());
+        Boolean accumulator = false;
+        Boolean inputField = true;
+        assertThat(fieldBoolAndAgg.agg(accumulator, 
inputField)).isEqualTo(false);
+
+        accumulator = true;
+        inputField = true;
+        assertThat(fieldBoolAndAgg.agg(accumulator, 
inputField)).isEqualTo(true);
+    }
+
+    @Test
+    public void testFieldBoolOrAgg() {
+        FieldBoolOrAgg fieldBoolOrAgg = new FieldBoolOrAgg(new BooleanType());
+        Boolean accumulator = false;
+        Boolean inputField = true;
+        assertThat(fieldBoolOrAgg.agg(accumulator, 
inputField)).isEqualTo(true);
+
+        accumulator = false;
+        inputField = false;
+        assertThat(fieldBoolOrAgg.agg(accumulator, 
inputField)).isEqualTo(false);
+    }
+
+    @Test
+    public void testFieldLastNonNullValueAgg() {
+        FieldLastNonNullValueAgg fieldLastNonNullValueAgg =
+                new FieldLastNonNullValueAgg(new IntType());
+        Integer accumulator = null;
+        Integer inputField = 1;
+        assertThat(fieldLastNonNullValueAgg.agg(accumulator, 
inputField)).isEqualTo(1);
+
+        accumulator = 1;
+        inputField = null;
+        assertThat(fieldLastNonNullValueAgg.agg(accumulator, 
inputField)).isEqualTo(1);
+    }
+
+    @Test
+    public void testFieldLastValueAgg() {
+        FieldLastValueAgg fieldLastValueAgg = new FieldLastValueAgg(new 
IntType());
+        Integer accumulator = null;
+        Integer inputField = 1;
+        assertThat(fieldLastValueAgg.agg(accumulator, 
inputField)).isEqualTo(1);
+
+        accumulator = 1;
+        inputField = null;
+        assertThat(fieldLastValueAgg.agg(accumulator, 
inputField)).isEqualTo(null);
+    }
+
+    @Test
+    public void testFieldListaggAgg() {
+        FieldListaggAgg fieldListaggAgg = new FieldListaggAgg(new 
VarCharType());
+        StringData accumulator = BinaryStringData.fromString("user1");
+        StringData inputField = BinaryStringData.fromString("user2");
+        assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
+                .isEqualTo("user1,user2");
+    }
+
+    @Test
+    public void testFieldMaxAgg() {
+        FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());
+        Integer accumulator = 1;
+        Integer inputField = 10;
+        assertThat(fieldMaxAgg.agg(accumulator, inputField)).isEqualTo(10);
+    }
+
+    @Test
+    public void testFieldMinAgg() {
+        FieldMinAgg fieldMinAgg = new FieldMinAgg(new IntType());
+        Integer accumulator = 1;
+        Integer inputField = 10;
+        assertThat(fieldMinAgg.agg(accumulator, inputField)).isEqualTo(1);
+    }
+
+    @Test
+    public void testFieldSumAgg() {
+        FieldSumAgg fieldSumAgg = new FieldSumAgg(new IntType());
+        Integer accumulator = 1;
+        Integer inputField = 10;
+        assertThat(fieldSumAgg.agg(accumulator, inputField)).isEqualTo(11);
+    }
+}

Reply via email to