SteNicholas commented on code in PR #558:
URL: https://github.com/apache/flink-table-store/pull/558#discussion_r1118546862
##########
flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java:
##########
@@ -701,6 +703,74 @@ public void testAuditLog() throws Exception {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]",
"+I[10, 1]");
}
+ @Test
+ public void testAggMergeFunc() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set("merge-engine", "aggregation");
+ options.set("fields.b.aggregate-function", "sum");
+ options.set("fields.c.aggregate-function", "max");
+ options.set("fields.c.ignore-retract", "true");
+ },
+ rowType);
+ Function<InternalRow, String> rowToString = row ->
rowDataToString(row, rowType);
+ DataTableScan scan = table.newScan();
+ TableRead read = table.newRead();
+ TableWrite write = table.newWrite("");
+ TableCommit commit = table.newCommit("");
+
+ // 1. inserts
+ write.write(GenericRow.of(1, 1, 3, 3));
+ write.write(GenericRow.of(1, 1, 1, 1));
+ write.write(GenericRow.of(1, 1, 2, 2));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ List<String> result = getResult(read, scan.plan().splits(),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 6, 3]");
+
+ // 2. Retracts
+ write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 1, 1));
+ write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+ write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 1, 1));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ result = getResult(read, scan.plan().splits(), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");
+ }
+
+ @Test
+ public void testAggMergeFuncNotAllowRetract() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set("merge-engine", "aggregation");
+ options.set("fields.b.aggregate-function", "sum");
+ options.set("fields.c.aggregate-function", "max");
+ },
+ rowType);
+ TableWrite write = table.newWrite("");
+ write.write(GenericRow.of(1, 1, 3, 3));
+ write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 3, 3));
+ assertThatThrownBy(() -> write.prepareCommit(true, 0))
+ .hasMessageContaining(
+ "Aggregate function 'max' dose not support retraction,"
Review Comment:
```suggestion
"Aggregate function 'max' does not support
retraction,"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]