This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch global-counter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 34e22252a682a44cc617450018faaea73e11db43
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Apr 11 23:17:57 2024 +0800

    watermark
---
 .../event/common/watermark/PipeWatermarkEvent.java | 40 ++++++++++++++
 .../dataregion/DataRegionWatermarkInjector.java    | 61 ++++++++++++++++++++++
 .../dataregion/IoTDBDataRegionExtractor.java       | 39 ++++++++++++--
 .../config/constant/PipeExtractorConstant.java     |  4 ++
 4 files changed, 140 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
new file mode 100644
index 00000000000..a46d83e6831
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.watermark;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeWatermarkEvent implements Event {
+
+  private final long watermark;
+
+  public PipeWatermarkEvent(long watermark) {
+    this.watermark = watermark;
+  }
+
+  public long getWatermark() {
+    return watermark;
+  }
+
+  @Override
+  public String toString() {
+    return "PipeWatermarkEvent{" + "watermark=" + watermark + '}';
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
new file mode 100644
index 00000000000..64a33b7fe8a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.extractor.dataregion;
+
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+
+public class DataRegionWatermarkInjector {
+
+  public static final long MIN_INJECTION_INTERVAL_IN_MS = 1000 * 60 * 5; // 5 
minutes
+
+  private final long injectionIntervalInMs;
+  private long nextInjectionTime;
+
+  public DataRegionWatermarkInjector(long injectionIntervalInMs) {
+    this.injectionIntervalInMs =
+        Math.max(injectionIntervalInMs, MIN_INJECTION_INTERVAL_IN_MS)
+            / MIN_INJECTION_INTERVAL_IN_MS
+            * MIN_INJECTION_INTERVAL_IN_MS;
+    this.nextInjectionTime = 
calculateNextInjectionTime(this.injectionIntervalInMs);
+  }
+
+  public long getInjectionIntervalInMs() {
+    return injectionIntervalInMs;
+  }
+
+  public long getNextInjectionTime() {
+    return nextInjectionTime;
+  }
+
+  public PipeWatermarkEvent inject() {
+    if (System.currentTimeMillis() < nextInjectionTime) {
+      return null;
+    }
+
+    final PipeWatermarkEvent watermarkEvent = new 
PipeWatermarkEvent(nextInjectionTime);
+    nextInjectionTime = calculateNextInjectionTime(injectionIntervalInMs);
+    return watermarkEvent;
+  }
+
+  private static long calculateNextInjectionTime(long injectionIntervalInMs) {
+    final long currentTime = System.currentTimeMillis();
+    return currentTime / injectionIntervalInMs * injectionIntervalInMs + 
injectionIntervalInMs;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index c4d5ac60872..93b16081ea5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -66,6 +66,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
@@ -74,6 +76,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
 
 public class IoTDBDataRegionExtractor extends IoTDBExtractor {
 
@@ -82,6 +85,8 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {
   private PipeHistoricalDataRegionExtractor historicalExtractor;
   private PipeRealtimeDataRegionExtractor realtimeExtractor;
 
+  private DataRegionWatermarkInjector watermarkInjector;
+
   private boolean hasNoExtractionNeed = true;
 
   @Override
@@ -255,6 +260,23 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     historicalExtractor.customize(parameters, configuration);
     realtimeExtractor.customize(parameters, configuration);
 
+    // Set watermark injector
+    if (parameters.hasAnyAttributes(
+        EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
+      final long watermarkIntervalInMs =
+          parameters.getLongOrDefault(
+              Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, 
SOURCE_WATERMARK_INTERVAL_KEY),
+              EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
+      if (watermarkIntervalInMs > 0) {
+        watermarkInjector = new 
DataRegionWatermarkInjector(watermarkIntervalInMs);
+        LOGGER.info(
+            "Pipe {}@{}: Set watermark injector with interval {} ms.",
+            pipeName,
+            regionId,
+            watermarkInjector.getInjectionIntervalInMs());
+      }
+    }
+
     // register metric after generating taskID
     PipeExtractorMetrics.getInstance().register(this);
   }
@@ -348,10 +370,18 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       return null;
     }
 
-    Event event =
-        historicalExtractor.hasConsumedAll()
-            ? realtimeExtractor.supply()
-            : historicalExtractor.supply();
+    Event event = null;
+    if (!historicalExtractor.hasConsumedAll()) {
+      event = historicalExtractor.supply();
+    } else {
+      if (Objects.nonNull(watermarkInjector)) {
+        event = watermarkInjector.inject();
+      }
+      if (Objects.isNull(event)) {
+        event = realtimeExtractor.supply();
+      }
+    }
+
     if (Objects.nonNull(event)) {
       if (event instanceof TabletInsertionEvent) {
         PipeExtractorMetrics.getInstance().markTabletEvent(taskID);
@@ -361,6 +391,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
         PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
       }
     }
+
     return event;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 9d811d6548a..fc419e6cecc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -79,6 +79,10 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time";
   public static final String SOURCE_END_TIME_KEY = "source.end-time";
 
+  public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = 
"extractor.watermark-interval-ms";
+  public static final String SOURCE_WATERMARK_INTERVAL_KEY = 
"source.watermark-interval-ms";
+  public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; // 
-1 means no watermark
+
   private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
   }

Reply via email to