This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 264180d03 [core] Support default aggregate function for partial update
and aggregate merge function (#3374)
264180d03 is described below
commit 264180d03fa9e8573fd5d3d34e328b5ee582349c
Author: Fang Yong <[email protected]>
AuthorDate: Fri May 24 17:37:12 2024 +0800
[core] Support default aggregate function for partial update and aggregate
merge function (#3374)
---
docs/content/primary-key-table/merge-engine.md | 28 +++++++
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 16 ++++
.../compact/PartialUpdateMergeFunction.java | 11 +++
.../compact/aggregate/AggregateMergeFunction.java | 3 +
.../org/apache/paimon/schema/SchemaValidation.java | 4 +-
.../compact/PartialUpdateMergeFunctionTest.java | 29 +++++++
.../aggregate/AggregateMergeFunctionTest.java | 69 +++++++++++++++++
.../apache/paimon/flink/PartialUpdateITCase.java | 76 +++++++++++++++++++
.../apache/paimon/flink/PreAggregationITCase.java | 88 ++++++++++++++++++++++
10 files changed, 329 insertions(+), 1 deletion(-)
diff --git a/docs/content/primary-key-table/merge-engine.md
b/docs/content/primary-key-table/merge-engine.md
index bc6c1ee35..02b5e0b24 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -136,6 +136,34 @@ INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS
INT), 2, 2);
SELECT * FROM t; -- output 1, 2, 1, 2, 3
```
+You can specify a default aggregation function for all the input fields with
`fields.default-aggregate-function`, see example:
+
+```sql
+CREATE TABLE t (
+ k INT,
+ a INT,
+ b INT,
+ c INT,
+ d INT,
+ PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+ 'merge-engine'='partial-update',
+ 'fields.a.sequence-group' = 'b',
+ 'fields.c.sequence-group' = 'd',
+ 'fields.default-aggregate-function' = 'last_non_null_value',
+ 'fields.d.aggregate-function' = 'sum'
+ );
+
+INSERT INTO t VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
+INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
+INSERT INTO t VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
+INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
+
+
+SELECT * FROM t; -- output 1, 2, 2, 2, 3
+
+```
+
## Aggregation
{{< hint info >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 5e7db5c55..34bf784f8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -218,6 +218,12 @@ under the License.
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a
partitioned table with dynamic partition columns. Works only when the table has
partition keys.</td>
</tr>
+ <tr>
+ <td><h5>fields.default-aggregate-function</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Default aggregate function of all fields for partial-update
and aggregate merge function</td>
+ </tr>
<tr>
<td><h5>file-index.in-manifest-threshold</h5></td>
<td style="word-wrap: break-word;">500 bytes</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index a82166ad7..5927d7c1a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -64,6 +64,7 @@ public class CoreOptions implements Serializable {
public static final String FIELDS_PREFIX = "fields";
public static final String AGG_FUNCTION = "aggregate-function";
+ public static final String DEFAULT_AGG_FUNCTION =
"default-aggregate-function";
public static final String IGNORE_RETRACT = "ignore-retract";
@@ -1151,6 +1152,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Time field for record level expire, it should be
a seconds INT.");
+ public static final ConfigOption<String> FIELDS_DEFAULT_AGG_FUNC =
+ key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Default aggregate function of all fields for
partial-update and aggregate merge function");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1260,7 +1268,15 @@ public class CoreOptions implements Serializable {
return fileFormat.toLowerCase();
}
+ public String fieldsDefaultFunc() {
+ return options.get(FIELDS_DEFAULT_AGG_FUNC);
+ }
+
public boolean definedAggFunc() {
+ if (options.contains(FIELDS_DEFAULT_AGG_FUNC)) {
+ return true;
+ }
+
for (String key : options.toMap().keySet()) {
if (key.startsWith(FIELDS_PREFIX) && key.endsWith(AGG_FUNCTION)) {
return true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 9ed188709..680e3dd06 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -361,6 +361,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
List<String> fieldNames = rowType.getFieldNames();
List<DataType> fieldTypes = rowType.getFieldTypes();
Map<Integer, FieldAggregator> fieldAggregators = new HashMap<>();
+ String defaultAggFunc = options.fieldsDefaultFunc();
for (int i = 0; i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
DataType fieldType = fieldTypes.get(i);
@@ -379,6 +380,16 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
isPrimaryKey,
options,
fieldName));
+ } else if (defaultAggFunc != null) {
+ fieldAggregators.put(
+ i,
+ FieldAggregator.createFieldAggregator(
+ fieldType,
+ defaultAggFunc,
+ ignoreRetract,
+ isPrimaryKey,
+ options,
+ fieldName));
}
}
return fieldAggregators;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index 7f799e4dc..e73bfe8e9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -131,12 +131,15 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
}
FieldAggregator[] fieldAggregators = new
FieldAggregator[fieldNames.size()];
+ String defaultAggFunc = options.fieldsDefaultFunc();
for (int i = 0; i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
DataType fieldType = fieldTypes.get(i);
// aggregate by primary keys, so they do not aggregate
boolean isPrimaryKey = primaryKeys.contains(fieldName);
String strAggFunc = options.fieldAggFunc(fieldName);
+ strAggFunc = strAggFunc == null ? defaultAggFunc : strAggFunc;
+
boolean ignoreRetract =
options.fieldAggIgnoreRetract(fieldName);
fieldAggregators[i] =
FieldAggregator.createFieldAggregator(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 523fa4363..25b320515 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -49,6 +49,7 @@ import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
@@ -349,7 +350,8 @@ public class SchemaValidation {
if (k.startsWith(FIELDS_PREFIX)) {
String fieldName = k.split("\\.")[1];
checkArgument(
- fieldNames.contains(fieldName),
+ DEFAULT_AGG_FUNCTION.equals(fieldName)
+ ||
fieldNames.contains(fieldName),
String.format(
"Field %s can not be found in
table schema.",
fieldName));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index ed10ef949..fa41607d1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -30,6 +30,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
+import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -94,6 +95,34 @@ public class PartialUpdateMergeFunctionTest {
validate(func, 1, null, null, 6, null, null, 6);
}
+ @Test
+ public void testSequenceGroupDefaultAggFunc() {
+ Options options = new Options();
+ options.set("fields.f3.sequence-group", "f1,f2");
+ options.set("fields.f6.sequence-group", "f4,f5");
+ options.set(FIELDS_DEFAULT_AGG_FUNC, "last_non_null_value");
+ RowType rowType =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT());
+ MergeFunction<KeyValue> func =
+ PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"))
+ .create();
+ func.reset();
+ add(func, 1, 1, 1, 1, 1, 1, 1);
+ add(func, 1, 2, 2, 2, 2, 2, null);
+ validate(func, 1, 2, 2, 2, 1, 1, 1);
+ add(func, 1, 3, 3, 1, 3, 3, 3);
+ validate(func, 1, 2, 2, 2, 3, 3, 3);
+ add(func, 1, 4, null, 4, 5, null, 5);
+ validate(func, 1, 4, 2, 4, 5, 3, 5);
+ }
+
@Test
public void testSequenceGroupDefinedNoField() {
Options options = new Options();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
new file mode 100644
index 000000000..4a5f5f8df
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for aggregate merge function. */
+class AggregateMergeFunctionTest {
+ @Test
+ void testDefaultAggFunc() {
+ Options options = new Options();
+ options.set(FIELDS_DEFAULT_AGG_FUNC, "first_non_null_value");
+ options.set("fields.b.aggregate-function", "sum");
+ MergeFunction<KeyValue> aggregateFunction =
+ AggregateMergeFunction.factory(
+ options,
+ Arrays.asList("k", "a", "b", "c", "d"),
+ Arrays.asList(
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT()),
+ Collections.singletonList("k"))
+ .create();
+ aggregateFunction.reset();
+
+ aggregateFunction.add(value(1, null, 1, 1, 1));
+ aggregateFunction.add(value(1, 2, null, 2, 2));
+ aggregateFunction.add(value(1, 3, 3, null, 3));
+ aggregateFunction.add(value(1, 4, 4, 4, null));
+ aggregateFunction.add(value(1, 5, 5, 5, 5));
+
assertThat(aggregateFunction.getResult().value()).isEqualTo(GenericRow.of(1, 2,
13, 1, 1));
+ }
+
+ private KeyValue value(Integer... values) {
+ return new KeyValue()
+ .replace(GenericRow.of(values[0]), RowKind.INSERT,
GenericRow.of(values));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index a156bfb2d..fde5da636 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -215,6 +215,42 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
assertThat(sql("SELECT c, d FROM
SG")).containsExactlyInAnyOrder(Row.of(5, null));
}
+ @Test
+ public void testSequenceGroupWithDefaultAggFunc() {
+ sql(
+ "CREATE TABLE SG ("
+ + "k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2
INT, PRIMARY KEY (k) NOT ENFORCED)"
+ + " WITH ("
+ + "'merge-engine'='partial-update', "
+ + "'fields.g_1.sequence-group'='a,b', "
+ + "'fields.g_2.sequence-group'='c,d', "
+ +
"'fields.default-aggregate-function'='last_non_null_value');");
+
+ sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)");
+
+ // g_2 should not be updated
+ sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))");
+
+ // select *
+ assertThat(sql("SELECT * FROM
SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 1, 1, 1));
+
+ // projection
+ assertThat(sql("SELECT c, d FROM
SG")).containsExactlyInAnyOrder(Row.of(1, 1));
+
+ // g_1 should not be updated
+ sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)");
+
+ assertThat(sql("SELECT * FROM
SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 3, 3, 3));
+
+ // d should not be updated by null
+ sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))");
+ sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))");
+ sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)");
+
+ assertThat(sql("SELECT a, b FROM
SG")).containsExactlyInAnyOrder(Row.of(4, 4));
+ assertThat(sql("SELECT c, d FROM
SG")).containsExactlyInAnyOrder(Row.of(5, 3));
+ }
+
@Test
public void testInvalidSequenceGroup() {
Assertions.assertThatThrownBy(
@@ -319,6 +355,46 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
assertThat(sql("SELECT a, b, c FROM
AGG")).containsExactlyInAnyOrder(Row.of(6, 3, null));
}
+ @Test
+ public void testPartialUpdateWithDefaultAndFieldAggregation() {
+ sql(
+ "CREATE TABLE AGG ("
+ + "k INT, a INT, b INT, g_1 INT, c VARCHAR, g_2 INT,
PRIMARY KEY (k) NOT ENFORCED)"
+ + " WITH ("
+ + "'merge-engine'='partial-update', "
+ + "'fields.a.aggregate-function'='sum', "
+ + "'fields.g_1.sequence-group'='a', "
+ + "'fields.g_2.sequence-group'='c', "
+ +
"'fields.default-aggregate-function'='last_non_null_value');");
+ // a in group g_1 with sum agg
+ // b not in group
+ // c in group g_2 without agg
+
+ sql("INSERT INTO AGG VALUES (1, 1, 1, 1, '1', 1)");
+
+ // g_2 should not be updated
+ sql("INSERT INTO AGG VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT))");
+
+ // select *
+ assertThat(sql("SELECT * FROM
AGG")).containsExactlyInAnyOrder(Row.of(1, 3, 2, 2, "1", 1));
+
+ // projection
+ assertThat(sql("SELECT a, c FROM
AGG")).containsExactlyInAnyOrder(Row.of(3, "1"));
+
+ // g_1 should not be updated
+ sql("INSERT INTO AGG VALUES (1, 3, 3, 1, '3', 3)");
+
+ assertThat(sql("SELECT * FROM
AGG")).containsExactlyInAnyOrder(Row.of(1, 6, 3, 2, "3", 3));
+
+ sql(
+ "INSERT INTO AGG VALUES (1, CAST(NULL AS INT), CAST(NULL AS
INT), 2, CAST(NULL AS VARCHAR), 4)");
+
+ // a keep the last accumulator
+ // b is not updated to null
+ // c is updated to "3" for default agg func last_non_null_value
+ assertThat(sql("SELECT a, b, c FROM
AGG")).containsExactlyInAnyOrder(Row.of(6, 3, "3"));
+ }
+
@Test
public void testFirstValuePartialUpdate() {
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 85f5e93f0..ad7968341 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -1722,4 +1722,92 @@ public class PreAggregationITCase {
}
}
}
+
+ /** ITCase for last non-null value aggregate function. */
+ public static class FieldsDefaultAggregationITCase extends
CatalogITCaseBase {
+ @Override
+ protected int defaultParallelism() {
+ // set parallelism to 1 so that the order of input data is
determined
+ return 1;
+ }
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS test_default_agg_func ("
+ + "j INT, k INT, "
+ + "a INT, "
+ + "b INT, "
+ + "i DATE,"
+ + "PRIMARY KEY (j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ +
"'fields.default-aggregate-function'='first_non_null_value', "
+ +
"'fields.i.aggregate-function'='last_non_null_value'"
+ + ");");
+ }
+
+ @Test
+ public void testMergeInMemory() {
+ // VALUES does not guarantee order, but order is important for
last value aggregations.
+ // So we need to sort the input data.
+ batchSql(
+ "CREATE TABLE myTable AS "
+ + "SELECT b, c, d, e, f FROM "
+ + "(VALUES "
+ + " (1, 1, 2, CAST(NULL AS INT), 4,
CAST('2020-01-01' AS DATE)),"
+ + " (2, 1, 2, 2, CAST(NULL as INT),
CAST('2020-01-02' AS DATE)),"
+ + " (3, 1, 2, 3, 5, CAST(NULL AS DATE))"
+ + ") AS V(a, b, c, d, e, f) "
+ + "ORDER BY a");
+ batchSql("INSERT INTO test_default_agg_func SELECT * FROM
myTable");
+ List<Row> result = batchSql("SELECT * FROM test_default_agg_func");
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 2, 2, 4,
LocalDate.of(2020, 1, 2)));
+ }
+
+ @Test
+ public void testMergeRead() {
+ batchSql(
+ "INSERT INTO test_default_agg_func VALUES (1, 2, CAST(NULL
AS INT), 3, CAST('2020-01-01' AS DATE))");
+ batchSql(
+ "INSERT INTO test_default_agg_func VALUES (1, 2, 2,
CAST(NULL AS INT), CAST('2020-01-02' AS DATE))");
+ batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, 3, 5,
CAST(NULL AS DATE))");
+
+ List<Row> result = batchSql("SELECT * FROM test_default_agg_func");
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 2, 2, 3,
LocalDate.of(2020, 1, 2)));
+ }
+
+ @Test
+ public void testMergeCompaction() {
+ // Wait compaction
+ batchSql("ALTER TABLE test_default_agg_func SET
('commit.force-compact'='true')");
+
+ // key 1 2
+ batchSql(
+ "INSERT INTO test_default_agg_func VALUES (1, 2, CAST(NULL
AS INT), 3, CAST('2020-01-01' AS DATE))");
+ batchSql(
+ "INSERT INTO test_default_agg_func VALUES (1, 2, 2,
CAST(NULL AS INT), CAST('2020-01-02' AS DATE))");
+ batchSql("INSERT INTO test_default_agg_func VALUES (1, 2, 3, 5,
CAST(NULL AS DATE))");
+
+ // key 1 3
+ batchSql(
+ "INSERT INTO test_default_agg_func VALUES (1, 3, 3, 4,
CAST('2020-01-01' AS DATE))");
+ batchSql("INSERT INTO test_default_agg_func VALUES (1, 3, 2, 6,
CAST(NULL AS DATE))");
+ batchSql(
+ "INSERT INTO test_default_agg_func VALUES (1, 3, CAST(NULL
AS INT), CAST(NULL AS INT), CAST('2022-01-02' AS DATE))");
+
+ assertThat(batchSql("SELECT * FROM test_default_agg_func"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 2, 2, 3, LocalDate.of(2020, 1, 2)),
+ Row.of(1, 3, 3, 4, LocalDate.of(2022, 1, 2)));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("test_default_agg_func").execute().print(),
+ "Pre-aggregate continuous reading is not supported");
+ }
+ }
}