This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5bd3c6fa9 [flink] Support the watermark function of auditlog table
(#3837)
5bd3c6fa9 is described below
commit 5bd3c6fa9eb5d4b66486f77126742faf1504c4c8
Author: HeavenZH <[email protected]>
AuthorDate: Wed Jul 31 20:21:43 2024 +0800
[flink] Support the watermark function of auditlog table (#3837)
---
.../apache/paimon/flink/SystemCatalogTable.java | 26 ++++++++++++++---
.../org/apache/paimon/flink/WatermarkITCase.java | 33 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
index 30a2b5b2e..d5d843d91 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
@@ -19,17 +19,24 @@
package org.apache.paimon.flink;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.AuditLogTable;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.types.utils.TypeConversions;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
/** A {@link CatalogTable} to represent system table. */
public class SystemCatalogTable implements CatalogTable {
@@ -46,10 +53,21 @@ public class SystemCatalogTable implements CatalogTable {
@Override
public Schema getUnresolvedSchema() {
- return Schema.newBuilder()
- .fromRowDataType(
-
TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType())))
- .build();
+ Schema.Builder builder = Schema.newBuilder();
+ builder.fromRowDataType(
+
TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType())));
+ if (table instanceof AuditLogTable) {
+ Map<String, String> newOptions = new HashMap<>(table.options());
+ if (newOptions.keySet().stream()
+ .anyMatch(key -> key.startsWith(compoundKey(SCHEMA,
WATERMARK)))) {
+ WatermarkSpec watermarkSpec =
deserializeWatermarkSpec(newOptions);
+ return builder.watermark(
+ watermarkSpec.getRowtimeAttribute(),
+ watermarkSpec.getWatermarkExpr())
+ .build();
+ }
+ }
+ return builder.build();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
index c213ef66f..11aef1062 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -41,6 +41,39 @@ public class WatermarkITCase extends CatalogITCaseBase {
innerTestWatermark();
}
+ @Test
+ public void testAuditLogWatermark() throws Exception {
+ String[] options =
+ new String[] {
+ "'scan.watermark.idle-timeout'='1s'",
+ "'scan.watermark.alignment.group'='group'",
+ "'scan.watermark.alignment.update-interval'='2s'",
+ "'scan.watermark.alignment.max-drift'='1s'"
+ };
+ sql(
+ "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS
ts) WITH ("
+ + String.join(",", options)
+ + ")");
+
+ BlockingIterator<Row, Row> select =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT window_start, window_end, SUM(f0) FROM
TABLE("
+ + "TUMBLE(TABLE T$audit_log,
DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n"
+ + " GROUP BY window_start,
window_end;"));
+
+ sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')");
+ sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:05')");
+
+ assertThat(select.collect(1))
+ .containsExactlyInAnyOrder(
+ Row.of(
+ LocalDateTime.parse("2023-02-02T12:00"),
+ LocalDateTime.parse("2023-02-02T12:10"),
+ 1));
+ select.close();
+ }
+
@Disabled // TODO unstable alignment may block watermark generation
@Test
public void testWatermarkAlignment() throws Exception {