This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bd3c6fa9 [flink] Support the watermark function of auditlog table 
(#3837)
5bd3c6fa9 is described below

commit 5bd3c6fa9eb5d4b66486f77126742faf1504c4c8
Author: HeavenZH <[email protected]>
AuthorDate: Wed Jul 31 20:21:43 2024 +0800

    [flink] Support the watermark function of auditlog table (#3837)
---
 .../apache/paimon/flink/SystemCatalogTable.java    | 26 ++++++++++++++---
 .../org/apache/paimon/flink/WatermarkITCase.java   | 33 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
index 30a2b5b2e..d5d843d91 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
@@ -19,17 +19,24 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.AuditLogTable;
 
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.types.utils.TypeConversions;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
 
 /** A {@link CatalogTable} to represent system table. */
 public class SystemCatalogTable implements CatalogTable {
@@ -46,10 +53,21 @@ public class SystemCatalogTable implements CatalogTable {
 
     @Override
     public Schema getUnresolvedSchema() {
-        return Schema.newBuilder()
-                .fromRowDataType(
-                        
TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType())))
-                .build();
+        Schema.Builder builder = Schema.newBuilder();
+        builder.fromRowDataType(
+                
TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType())));
+        if (table instanceof AuditLogTable) {
+            Map<String, String> newOptions = new HashMap<>(table.options());
+            if (newOptions.keySet().stream()
+                    .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, 
WATERMARK)))) {
+                WatermarkSpec watermarkSpec = 
deserializeWatermarkSpec(newOptions);
+                return builder.watermark(
+                                watermarkSpec.getRowtimeAttribute(),
+                                watermarkSpec.getWatermarkExpr())
+                        .build();
+            }
+        }
+        return builder.build();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
index c213ef66f..11aef1062 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -41,6 +41,39 @@ public class WatermarkITCase extends CatalogITCaseBase {
         innerTestWatermark();
     }
 
+    @Test
+    public void testAuditLogWatermark() throws Exception {
+        String[] options =
+                new String[] {
+                    "'scan.watermark.idle-timeout'='1s'",
+                    "'scan.watermark.alignment.group'='group'",
+                    "'scan.watermark.alignment.update-interval'='2s'",
+                    "'scan.watermark.alignment.max-drift'='1s'"
+                };
+        sql(
+                "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS 
ts) WITH ("
+                        + String.join(",", options)
+                        + ")");
+
+        BlockingIterator<Row, Row> select =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT window_start, window_end, SUM(f0) FROM 
TABLE("
+                                        + "TUMBLE(TABLE T$audit_log, 
DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n"
+                                        + "  GROUP BY window_start, 
window_end;"));
+
+        sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')");
+        sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:05')");
+
+        assertThat(select.collect(1))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                LocalDateTime.parse("2023-02-02T12:00"),
+                                LocalDateTime.parse("2023-02-02T12:10"),
+                                1));
+        select.close();
+    }
+
     @Disabled // TODO unstable alignment may block watermark generation
     @Test
     public void testWatermarkAlignment() throws Exception {

Reply via email to