This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch global-counter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 34e22252a682a44cc617450018faaea73e11db43 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Apr 11 23:17:57 2024 +0800 watermark --- .../event/common/watermark/PipeWatermarkEvent.java | 40 ++++++++++++++ .../dataregion/DataRegionWatermarkInjector.java | 61 ++++++++++++++++++++++ .../dataregion/IoTDBDataRegionExtractor.java | 39 ++++++++++++-- .../config/constant/PipeExtractorConstant.java | 4 ++ 4 files changed, 140 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java new file mode 100644 index 00000000000..a46d83e6831 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.watermark; + +import org.apache.iotdb.pipe.api.event.Event; + +public class PipeWatermarkEvent implements Event { + + private final long watermark; + + public PipeWatermarkEvent(long watermark) { + this.watermark = watermark; + } + + public long getWatermark() { + return watermark; + } + + @Override + public String toString() { + return "PipeWatermarkEvent{" + "watermark=" + watermark + '}'; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java new file mode 100644 index 00000000000..64a33b7fe8a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.extractor.dataregion; + +import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent; + +public class DataRegionWatermarkInjector { + + public static final long MIN_INJECTION_INTERVAL_IN_MS = 1000 * 60 * 5; // 5 minutes + + private final long injectionIntervalInMs; + private long nextInjectionTime; + + public DataRegionWatermarkInjector(long injectionIntervalInMs) { + this.injectionIntervalInMs = + Math.max(injectionIntervalInMs, MIN_INJECTION_INTERVAL_IN_MS) + / MIN_INJECTION_INTERVAL_IN_MS + * MIN_INJECTION_INTERVAL_IN_MS; + this.nextInjectionTime = calculateNextInjectionTime(this.injectionIntervalInMs); + } + + public long getInjectionIntervalInMs() { + return injectionIntervalInMs; + } + + public long getNextInjectionTime() { + return nextInjectionTime; + } + + public PipeWatermarkEvent inject() { + if (System.currentTimeMillis() < nextInjectionTime) { + return null; + } + + final PipeWatermarkEvent watermarkEvent = new PipeWatermarkEvent(nextInjectionTime); + nextInjectionTime = calculateNextInjectionTime(injectionIntervalInMs); + return watermarkEvent; + } + + private static long calculateNextInjectionTime(long injectionIntervalInMs) { + final long currentTime = System.currentTimeMillis(); + return currentTime / injectionIntervalInMs * injectionIntervalInMs + injectionIntervalInMs; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index c4d5ac60872..93b16081ea5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -66,6 +66,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; @@ -74,6 +76,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY; public class IoTDBDataRegionExtractor extends IoTDBExtractor { @@ -82,6 +85,8 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { private PipeHistoricalDataRegionExtractor historicalExtractor; private PipeRealtimeDataRegionExtractor realtimeExtractor; + private DataRegionWatermarkInjector watermarkInjector; + private boolean hasNoExtractionNeed = true; @Override @@ -255,6 +260,23 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { historicalExtractor.customize(parameters, configuration); realtimeExtractor.customize(parameters, configuration); + // Set watermark injector + if (parameters.hasAnyAttributes( + EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) { + final long watermarkIntervalInMs = + parameters.getLongOrDefault( + Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY), + EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE); + if (watermarkIntervalInMs > 0) { + watermarkInjector = new DataRegionWatermarkInjector(watermarkIntervalInMs); + LOGGER.info( + "Pipe {}@{}: Set watermark injector with interval {} ms.", + pipeName, + regionId, + watermarkInjector.getInjectionIntervalInMs()); + } + } + // register metric after generating taskID PipeExtractorMetrics.getInstance().register(this); } @@ -348,10 +370,18 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { return null; } - Event event = - historicalExtractor.hasConsumedAll() - ? realtimeExtractor.supply() - : historicalExtractor.supply(); + Event event = null; + if (!historicalExtractor.hasConsumedAll()) { + event = historicalExtractor.supply(); + } else { + if (Objects.nonNull(watermarkInjector)) { + event = watermarkInjector.inject(); + } + if (Objects.isNull(event)) { + event = realtimeExtractor.supply(); + } + } + if (Objects.nonNull(event)) { if (event instanceof TabletInsertionEvent) { PipeExtractorMetrics.getInstance().markTabletEvent(taskID); @@ -361,6 +391,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID); } } + return event; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 9d811d6548a..fc419e6cecc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -79,6 +79,10 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time"; public static final String SOURCE_END_TIME_KEY = "source.end-time"; + public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms"; + public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms"; + public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; // -1 means no watermark + private PipeExtractorConstant() { throw new IllegalStateException("Utility class"); }
