This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f5cf782df5b Pipe: Two stage aggregate framework &
count-point-processor plugin (#12328)
f5cf782df5b is described below
commit f5cf782df5b8ad6aa23f060308f240ac7d5894bf
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Apr 24 22:14:47 2024 +0800
Pipe: Two stage aggregate framework & count-point-processor plugin (#12328)
---
.../PipeDataRegionProcessorConstructor.java | 3 +
.../agent/receiver/PipeDataNodeReceiverAgent.java | 5 -
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 9 +
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 5 +
.../common/tablet/PipeRawTabletInsertionEvent.java | 5 +
.../common/tsfile/PipeTsFileInsertionEvent.java | 25 ++
.../common/tsfile/TsFileInsertionPointCounter.java | 169 +++++++++
.../common/watermark/PipeWatermarkEvent.java} | 23 +-
.../dataregion/DataRegionWatermarkInjector.java | 76 ++++
.../dataregion/IoTDBDataRegionExtractor.java | 39 ++-
.../PipeHistoricalDataRegionTsFileExtractor.java | 24 +-
.../pipe/processor/twostage/combiner/Combiner.java | 107 ++++++
.../twostage/combiner/PipeCombineHandler.java | 198 +++++++++++
.../combiner/PipeCombineHandlerManager.java | 163 +++++++++
.../twostage/exchange/payload/CombineRequest.java | 139 ++++++++
.../payload/FetchCombineResultRequest.java | 119 +++++++
.../payload/FetchCombineResultResponse.java | 99 ++++++
.../twostage/exchange/payload/RequestType.java} | 36 +-
.../receiver/TwoStageAggregateReceiver.java | 82 +++++
.../exchange/sender/TwoStageAggregateSender.java | 235 +++++++++++++
.../processor/twostage/operator/CountOperator.java | 52 +++
.../processor/twostage/operator/Operator.java} | 16 +-
.../twostage/plugin/TwoStageCountProcessor.java | 388 +++++++++++++++++++++
.../pipe/processor/twostage/state/CountState.java} | 36 +-
.../db/pipe/processor/twostage/state/State.java} | 18 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 5 +-
.../thrift/IoTDBDataNodeReceiverAgent.java | 3 +
.../apache/iotdb/commons/conf/CommonConfig.java | 32 ++
.../iotdb/commons/conf/CommonDescriptor.java | 17 +
.../commons/consensus/index/ProgressIndex.java | 7 +-
.../commons/consensus/index/ProgressIndexType.java | 7 +-
.../consensus/index/impl/HybridProgressIndex.java | 4 +
.../consensus/index/impl/MetaProgressIndex.java | 2 +-
.../consensus/index/impl/StateProgressIndex.java | 234 +++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 25 ++
.../config/constant/PipeExtractorConstant.java | 4 +
.../config/constant/PipeProcessorConstant.java | 2 +
.../request/IoTDBConnectorRequestVersion.java | 1 +
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 2 +
.../twostage/TwoStageCountProcessor.java} | 23 +-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 14 +-
41 files changed, 2363 insertions(+), 90 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index bc33c6d97ea..15578f2fc2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardS
import
org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
import
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
import
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
+import
org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
@@ -57,5 +58,7 @@ class PipeDataRegionProcessorConstructor extends
PipeProcessorConstructor {
pluginConstructors.put(
BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
TumblingWindowingProcessor::new);
+ pluginConstructors.put(
+ BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(),
TwoStageCountProcessor::new);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
index 555696d4147..31d38d00f6f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
@@ -25,17 +25,12 @@ import
org.apache.iotdb.db.pipe.receiver.protocol.airgap.IoTDBAirGapReceiverAgen
import
org.apache.iotdb.db.pipe.receiver.protocol.legacy.IoTDBLegacyPipeReceiverAgent;
import
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.util.Arrays;
/** {@link PipeDataNodeReceiverAgent} is the entry point of all pipe
receivers' logic. */
public class PipeDataNodeReceiverAgent {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeReceiverAgent.class);
-
private final IoTDBDataNodeReceiverAgent thriftAgent;
private final IoTDBAirGapReceiverAgent airGapAgent;
private final IoTDBLegacyPipeReceiverAgent legacyAgent;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8c806fe2999..b8ace00dd2f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -411,4 +411,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// Set pipe meta status to STOPPED
pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
+
+ ///////////////////////// Utils /////////////////////////
+
+ public Set<Integer> getPipeTaskRegionIdSet(String pipeName, long
creationTime) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() !=
creationTime
+ ? Collections.emptySet()
+ : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index c7821e4867f..798cb1da706 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -251,6 +251,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
}
+ public long count() {
+ final Tablet covertedTablet = convertToTablet();
+ return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+ }
+
/////////////////////////// parsePatternOrTime ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index dccfd0e0a21..f227475e648 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -225,6 +225,11 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
return dataContainer.convertToTablet();
}
+ public long count() {
+ final Tablet covertedTablet = shouldParseTimeOrPattern() ?
convertToTablet() : tablet;
+ return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+ }
+
/////////////////////////// parsePatternOrTime ///////////////////////////
public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 8e3fa5bd1f1..c609eb907f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -323,6 +324,30 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
}
+ public long count(boolean skipReportOnCommit) throws IOException {
+ long count = 0;
+
+ if (shouldParseTime()) {
+ try {
+ for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
+ final PipeRawTabletInsertionEvent rawEvent =
((PipeRawTabletInsertionEvent) event);
+ count += rawEvent.count();
+ if (skipReportOnCommit) {
+ rawEvent.skipReportOnCommit();
+ }
+ }
+ return count;
+ } finally {
+ close();
+ }
+ }
+
+ try (final TsFileInsertionPointCounter counter =
+ new TsFileInsertionPointCounter(tsFile, pipePattern)) {
+ return counter.count();
+ }
+ }
+
/** Release the resource of {@link TsFileInsertionDataContainer}. */
@Override
public void close() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
new file mode 100644
index 00000000000..086b03c4558
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile;
+
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class TsFileInsertionPointCounter implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionPointCounter.class);
+
+ private final PipePattern pattern;
+
+ private final TsFileSequenceReader tsFileSequenceReader;
+
+ final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap;
+ final Map<IDeviceID, List<TimeseriesMetadata>>
allDeviceTimeseriesMetadataMap;
+
+ private boolean shouldParsePattern = false;
+
+ private long count = 0;
+
+ public TsFileInsertionPointCounter(File tsFile, PipePattern pattern) throws
IOException {
+ this.pattern = pattern;
+
+ try {
+ tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true,
true);
+
+ filteredDeviceMeasurementMap = filterDeviceMeasurementsMapByPattern();
+ allDeviceTimeseriesMetadataMap =
tsFileSequenceReader.getAllTimeseriesMetadata(false);
+
+ if (shouldParsePattern) {
+ countMatchedTimeseriesPoints();
+ } else {
+ countAllTimeseriesPoints();
+ }
+
+ // No longer need this. Help GC.
+ tsFileSequenceReader.clearCachedDeviceMetadata();
+ } catch (Exception e) {
+ close();
+ throw e;
+ }
+ }
+
+ private Map<IDeviceID, Set<String>> filterDeviceMeasurementsMapByPattern()
throws IOException {
+ final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap =
+ tsFileSequenceReader.getDeviceMeasurementsMap();
+ final Map<IDeviceID, Set<String>> filteredDeviceMeasurementsMap = new
HashMap<>();
+
+ for (Map.Entry<IDeviceID, List<String>> entry :
originalDeviceMeasurementsMap.entrySet()) {
+ final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
+
+ // case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
+ // in this case, all data can be matched without checking the
measurements
+ if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(deviceId)) {
+ if (!entry.getValue().isEmpty()) {
+ filteredDeviceMeasurementsMap.put(
+ new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
+ }
+ }
+
+ // case 2: for example, pattern is root.a.b.c and device is root.a.b
+ // in this case, we need to check the full path
+ else if (pattern.mayOverlapWithDevice(deviceId)) {
+ final Set<String> filteredMeasurements = new HashSet<>();
+
+ for (final String measurement : entry.getValue()) {
+ if (pattern.matchesMeasurement(deviceId, measurement)) {
+ filteredMeasurements.add(measurement);
+ } else {
+ // Parse pattern iff there are measurements filtered out
+ shouldParsePattern = true;
+ }
+ }
+
+ if (!filteredMeasurements.isEmpty()) {
+ filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId),
filteredMeasurements);
+ }
+ }
+
+ // case 3: for example, pattern is root.a.b.c and device is root.a.b.d
+ // in this case, no data can be matched
+ else {
+ // Parse pattern iff there are measurements filtered out
+ shouldParsePattern = true;
+ }
+ }
+
+ return filteredDeviceMeasurementsMap;
+ }
+
+ private void countMatchedTimeseriesPoints() {
+ for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>>
deviceTimeseriesMetadataEntry :
+ allDeviceTimeseriesMetadataMap.entrySet()) {
+ final IDeviceID deviceId = deviceTimeseriesMetadataEntry.getKey();
+ if (!filteredDeviceMeasurementMap.containsKey(deviceId)) {
+ continue;
+ }
+
+ final List<TimeseriesMetadata> allTimeseriesMetadata =
+ deviceTimeseriesMetadataEntry.getValue();
+ final Set<String> filteredMeasurements =
filteredDeviceMeasurementMap.get(deviceId);
+ for (final TimeseriesMetadata timeseriesMetadata :
allTimeseriesMetadata) {
+ if
(!filteredMeasurements.contains(timeseriesMetadata.getMeasurementId())) {
+ continue;
+ }
+
+ count += timeseriesMetadata.getStatistics().getCount();
+ }
+ }
+ }
+
+ private void countAllTimeseriesPoints() {
+ for (final List<TimeseriesMetadata> allTimeseriesMetadata :
+ allDeviceTimeseriesMetadataMap.values()) {
+ for (final TimeseriesMetadata timeseriesMetadata :
allTimeseriesMetadata) {
+ count += timeseriesMetadata.getStatistics().getCount();
+ }
+ }
+ }
+
+ public long count() {
+ return count;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (tsFileSequenceReader != null) {
+ tsFileSequenceReader.close();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close TsFileSequenceReader", e);
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
similarity index 65%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
index 9f853dd403e..a46d83e6831 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
@@ -17,19 +17,24 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.event.common.watermark;
-public enum IoTDBConnectorRequestVersion {
- VERSION_1((byte) 1),
- ;
+import org.apache.iotdb.pipe.api.event.Event;
- private final byte version;
+public class PipeWatermarkEvent implements Event {
- IoTDBConnectorRequestVersion(byte type) {
- this.version = type;
+ private final long watermark;
+
+ public PipeWatermarkEvent(long watermark) {
+ this.watermark = watermark;
+ }
+
+ public long getWatermark() {
+ return watermark;
}
- public byte getVersion() {
- return version;
+ @Override
+ public String toString() {
+ return "PipeWatermarkEvent{" + "watermark=" + watermark + '}';
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
new file mode 100644
index 00000000000..5d8bbf615ae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.extractor.dataregion;
+
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataRegionWatermarkInjector {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataRegionWatermarkInjector.class);
+
+ public static final long MIN_INJECTION_INTERVAL_IN_MS = 30 * 1000; // 30s
+
+ private final int regionId;
+
+ private final long injectionIntervalInMs;
+ private long nextInjectionTime;
+
+ public DataRegionWatermarkInjector(int regionId, long injectionIntervalInMs)
{
+ this.regionId = regionId;
+ this.injectionIntervalInMs =
+ Math.max(injectionIntervalInMs, MIN_INJECTION_INTERVAL_IN_MS)
+ / MIN_INJECTION_INTERVAL_IN_MS
+ * MIN_INJECTION_INTERVAL_IN_MS;
+ this.nextInjectionTime =
calculateNextInjectionTime(this.injectionIntervalInMs);
+ }
+
+ public long getInjectionIntervalInMs() {
+ return injectionIntervalInMs;
+ }
+
+ public long getNextInjectionTime() {
+ return nextInjectionTime;
+ }
+
+ public PipeWatermarkEvent inject() {
+ if (System.currentTimeMillis() < nextInjectionTime) {
+ return null;
+ }
+
+ try {
+ final PipeWatermarkEvent watermarkEvent = new
PipeWatermarkEvent(nextInjectionTime);
+ nextInjectionTime = calculateNextInjectionTime(injectionIntervalInMs);
+ return watermarkEvent;
+ } finally {
+ LOGGER.info(
+ "Data region {}: Injected watermark event with timestamp: {}",
+ regionId,
+ nextInjectionTime);
+ }
+ }
+
+ private static long calculateNextInjectionTime(long injectionIntervalInMs) {
+ final long currentTime = System.currentTimeMillis();
+ return currentTime / injectionIntervalInMs * injectionIntervalInMs +
injectionIntervalInMs;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index c8b4d2be95d..b4291925e97 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -66,6 +66,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
@@ -74,6 +76,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
public class IoTDBDataRegionExtractor extends IoTDBExtractor {
@@ -82,6 +85,8 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {
private PipeHistoricalDataRegionExtractor historicalExtractor;
private PipeRealtimeDataRegionExtractor realtimeExtractor;
+ private DataRegionWatermarkInjector watermarkInjector;
+
private boolean hasNoExtractionNeed = true;
@Override
@@ -255,6 +260,23 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
historicalExtractor.customize(parameters, configuration);
realtimeExtractor.customize(parameters, configuration);
+ // Set watermark injector
+ if (parameters.hasAnyAttributes(
+ EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
+ final long watermarkIntervalInMs =
+ parameters.getLongOrDefault(
+ Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY,
SOURCE_WATERMARK_INTERVAL_KEY),
+ EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
+ if (watermarkIntervalInMs > 0) {
+ watermarkInjector = new DataRegionWatermarkInjector(regionId,
watermarkIntervalInMs);
+ LOGGER.info(
+ "Pipe {}@{}: Set watermark injector with interval {} ms.",
+ pipeName,
+ regionId,
+ watermarkInjector.getInjectionIntervalInMs());
+ }
+ }
+
// register metric after generating taskID
PipeExtractorMetrics.getInstance().register(this);
}
@@ -348,10 +370,18 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
return null;
}
- Event event =
- historicalExtractor.hasConsumedAll()
- ? realtimeExtractor.supply()
- : historicalExtractor.supply();
+ Event event = null;
+ if (!historicalExtractor.hasConsumedAll()) {
+ event = historicalExtractor.supply();
+ } else {
+ if (Objects.nonNull(watermarkInjector)) {
+ event = watermarkInjector.inject();
+ }
+ if (Objects.isNull(event)) {
+ event = realtimeExtractor.supply();
+ }
+ }
+
if (Objects.nonNull(event)) {
if (event instanceof TabletInsertionEvent) {
PipeExtractorMetrics.getInstance().markTabletEvent(taskID);
@@ -361,6 +391,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
}
}
+
return event;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 9356ccbf0b6..3d1528fe659 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.extractor.dataregion.historical;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -426,12 +427,23 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
private boolean mayTsFileContainUnprocessedData(TsFileResource resource) {
- return startIndex instanceof TimeWindowStateProgressIndex
- // The resource is closed thus the TsFileResource#getFileEndTime() is
safe to use
- ? ((TimeWindowStateProgressIndex) startIndex).getMinTime() <=
resource.getFileEndTime()
- // Some different tsFiles may share the same max progressIndex, thus
tsFiles with an
- // "equals" max progressIndex must be transmitted to avoid data loss
- : !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
+ if (startIndex instanceof TimeWindowStateProgressIndex) {
+ // The resource is closed thus the TsFileResource#getFileEndTime() is
safe to use
+ return ((TimeWindowStateProgressIndex) startIndex).getMinTime() <=
resource.getFileEndTime();
+ }
+
+ if (startIndex instanceof StateProgressIndex) {
+ // Some different tsFiles may share the same max progressIndex, thus
tsFiles with an
+ // "equals" max progressIndex must be transmitted to avoid data loss
+ final ProgressIndex innerProgressIndex =
+ ((StateProgressIndex) startIndex).getInnerProgressIndex();
+ return
!innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+ &&
!innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose());
+ }
+
+ // Some different tsFiles may share the same max progressIndex, thus
tsFiles with an
+ // "equals" max progressIndex must be transmitted to avoid data loss
+ return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
}
private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource
resource) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
new file mode 100644
index 00000000000..136e760e44f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Combiner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Combiner.class);
+
+ private static final long MAX_COMBINER_LIVE_TIME_IN_MS =
+ PipeConfig.getInstance().getTwoStageAggregateMaxCombinerLiveTimeInMs();
+ private final long creationTimeInMs;
+
+ private final Operator operator;
+
+ private final ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap;
+ private final Set<Integer> receivedRegionIdSet;
+
+ private final AtomicBoolean isComplete = new AtomicBoolean(false);
+
+ public Combiner(
+ Operator operator, ConcurrentMap<Integer, Integer>
expectedRegionId2DataNodeIdMap) {
+ this.creationTimeInMs = System.currentTimeMillis();
+
+ this.operator = operator;
+
+ this.expectedRegionId2DataNodeIdMap = expectedRegionId2DataNodeIdMap;
+ this.receivedRegionIdSet = new HashSet<>();
+ }
+
+ public TSStatus combine(int regionId, State state) {
+ final Set<Integer> finalExpectedRegionIdSet =
+ new HashSet<>(expectedRegionId2DataNodeIdMap.keySet());
+
+ if (finalExpectedRegionIdSet.isEmpty()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPE_ERROR, "Expected region id set is empty. Sender
should try again.");
+ }
+
+ receivedRegionIdSet.add(regionId);
+ operator.combine(state);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Combiner combine: regionId: {}, state: {}, receivedRegionIdSet: {},
expectedRegionIdSet: {}",
+ regionId,
+ state,
+ receivedRegionIdSet,
+ finalExpectedRegionIdSet);
+ }
+
+ if (receivedRegionIdSet.containsAll(finalExpectedRegionIdSet)) {
+ operator.onComplete();
+ isComplete.set(true);
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Combiner combine completed: regionId: {}, state: {},
receivedRegionIdSet: {}, expectedRegionIdSet: {}",
+ regionId,
+ state,
+ receivedRegionIdSet,
+ finalExpectedRegionIdSet);
+ }
+ }
+
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public boolean isOutdated() {
+ return System.currentTimeMillis() - creationTimeInMs >
MAX_COMBINER_LIVE_TIME_IN_MS;
+ }
+
+ public boolean isComplete() {
+ return isComplete.get();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
new file mode 100644
index 00000000000..55ef5afbfe9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+public class PipeCombineHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeCombineHandler.class);
+
+ private final String pipeName;
+ private final long creationTime;
+
+ private final Function<String, Operator> /* <combineId, operator> */
operatorConstructor;
+
+ private static final Map<Integer, Integer> ALL_REGION_ID_2_DATANODE_ID_MAP =
new HashMap<>();
+ private static final AtomicLong
ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME =
+ new AtomicLong(0);
+ private final ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap
=
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, Combiner> combineId2Combiner = new
ConcurrentHashMap<>();
+
+ public PipeCombineHandler(
+ String pipeName, long creationTime, Function<String, Operator>
operatorConstructor) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+
+ this.operatorConstructor = operatorConstructor;
+
+ fetchAndUpdateExpectedRegionId2DataNodeIdMap();
+ }
+
+ public synchronized TSStatus combine(int regionId, String combineId, State
state) {
+ return combineId2Combiner
+ .computeIfAbsent(
+ combineId,
+ id ->
+ new Combiner(operatorConstructor.apply(combineId),
expectedRegionId2DataNodeIdMap))
+ .combine(regionId, state);
+ }
+
+ public synchronized FetchCombineResultResponse
fetchCombineResult(List<String> combineIdList)
+ throws IOException {
+ final Map<String, FetchCombineResultResponse.CombineResultType>
combineId2ResultType =
+ new HashMap<>();
+ for (String combineId : combineIdList) {
+ final Combiner combiner = combineId2Combiner.get(combineId);
+
+ if (combiner == null || combiner.isOutdated()) {
+ combineId2ResultType.put(combineId,
FetchCombineResultResponse.CombineResultType.OUTDATED);
+ continue;
+ }
+
+ combineId2ResultType.put(
+ combineId,
+ combiner.isComplete()
+ ? FetchCombineResultResponse.CombineResultType.SUCCESS
+ : FetchCombineResultResponse.CombineResultType.INCOMPLETE);
+ }
+
+ return
FetchCombineResultResponse.toTPipeTransferResp(combineId2ResultType);
+ }
+
+ public void fetchAndUpdateExpectedRegionId2DataNodeIdMap() {
+
updateExpectedRegionId2DataNodeIdMap(fetchExpectedRegionId2DataNodeIdMap());
+ }
+
+ private Map<Integer, Integer> fetchExpectedRegionId2DataNodeIdMap() {
+ synchronized (ALL_REGION_ID_2_DATANODE_ID_MAP) {
+ if (System.currentTimeMillis() -
ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.get()
+ >
PipeConfig.getInstance().getTwoStageAggregateDataRegionInfoCacheTimeInMs()) {
+ ALL_REGION_ID_2_DATANODE_ID_MAP.clear();
+
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ final TShowRegionResp showRegionResp =
+ configNodeClient.showRegion(
+ new
TShowRegionReq().setConsensusGroupType(TConsensusGroupType.DataRegion));
+ if (showRegionResp == null || !showRegionResp.isSetRegionInfoList())
{
+ throw new PipeException("Failed to fetch data region ids");
+ }
+ for (final TRegionInfo regionInfo :
showRegionResp.getRegionInfoList()) {
+ if
(!RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ continue;
+ }
+ ALL_REGION_ID_2_DATANODE_ID_MAP.put(
+ regionInfo.getConsensusGroupId().getId(),
regionInfo.getDataNodeId());
+ }
+ } catch (ClientManagerException | TException e) {
+ throw new PipeException("Failed to fetch data nodes", e);
+ }
+
+
ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.set(System.currentTimeMillis());
+
+ LOGGER.info(
+ "Fetched data region ids {} at {}",
+ ALL_REGION_ID_2_DATANODE_ID_MAP,
+ ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.get());
+ }
+
+ final Set<Integer> pipeRelatedRegionIdSet =
+ new HashSet<>(PipeAgent.task().getPipeTaskRegionIdSet(pipeName,
creationTime));
+ pipeRelatedRegionIdSet.removeIf(
+ regionId -> !ALL_REGION_ID_2_DATANODE_ID_MAP.containsKey(regionId));
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Two stage aggregate pipe (pipeName={}, creationTime={}) related
region ids {}",
+ pipeName,
+ creationTime,
+ pipeRelatedRegionIdSet);
+ }
+ return ALL_REGION_ID_2_DATANODE_ID_MAP.entrySet().stream()
+ .filter(entry -> pipeRelatedRegionIdSet.contains(entry.getKey()))
+ .collect(
+ HashMap::new,
+ (map, entry) -> map.put(entry.getKey(), entry.getValue()),
+ HashMap::putAll);
+ }
+ }
+
+ private synchronized void updateExpectedRegionId2DataNodeIdMap(
+ Map<Integer, Integer> newExpectedRegionId2DataNodeIdMap) {
+ expectedRegionId2DataNodeIdMap.clear();
+ expectedRegionId2DataNodeIdMap.putAll(newExpectedRegionId2DataNodeIdMap);
+ }
+
+ public synchronized Set<Integer> getExpectedDataNodeIdSet() {
+ return new HashSet<>(expectedRegionId2DataNodeIdMap.values());
+ }
+
+ public synchronized void cleanOutdatedCombiner() {
+ combineId2Combiner
+ .entrySet()
+ .removeIf(
+ entry -> {
+ if (!entry.getValue().isComplete()) {
+ LOGGER.info(
+ "Clean outdated incomplete combiner: pipeName={},
creationTime={}, combineId={}",
+ pipeName,
+ creationTime,
+ entry.getKey());
+ }
+ return entry.getValue().isOutdated();
+ });
+ }
+
+ public synchronized void close() {
+ combineId2Combiner.clear();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
new file mode 100644
index 00000000000..c08cd23d0c8
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class PipeCombineHandlerManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeCombineHandlerManager.class);
+
+ private final ConcurrentMap<String, PipeCombineHandler>
pipeId2CombineHandler =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Object> pipeId2LastCombinedValue = new
ConcurrentHashMap<>();
+
+ public synchronized void register(
+ String pipeName, long creationTime, Function<String, Operator>
operatorConstructor) {
+ final String pipeId = generatePipeId(pipeName, creationTime);
+
+ pipeId2CombineHandler.putIfAbsent(
+ pipeId, new PipeCombineHandler(pipeName, creationTime,
operatorConstructor));
+
+ pipeId2ReferenceCount.putIfAbsent(pipeId, new AtomicInteger(0));
+ pipeId2ReferenceCount.get(pipeId).incrementAndGet();
+ }
+
+ public synchronized void deregister(String pipeName, long creationTime) {
+ final String pipeId = generatePipeId(pipeName, creationTime);
+
+ if (pipeId2ReferenceCount.containsKey(pipeId)
+ && pipeId2ReferenceCount.get(pipeId).decrementAndGet() <= 0) {
+ pipeId2LastCombinedValue.remove(pipeId);
+
+ pipeId2ReferenceCount.remove(pipeId);
+
+ try {
+ pipeId2CombineHandler.remove(pipeId).close();
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred when closing CombineHandler(id = {})",
pipeId, e);
+ }
+ }
+ }
+
+ public Object getLastCombinedValue(String pipeName, long creationTime) {
+ return pipeId2LastCombinedValue.get(generatePipeId(pipeName,
creationTime));
+ }
+
+ public void updateLastCombinedValue(
+ String pipeName, long creationTime, Object lastCombinedValue) {
+ pipeId2LastCombinedValue.put(generatePipeId(pipeName, creationTime),
lastCombinedValue);
+ }
+
+ public synchronized Set<Integer> getExpectedDataNodeIdSet(String pipeName,
long creationTime) {
+ final PipeCombineHandler handler =
+ pipeId2CombineHandler.get(generatePipeId(pipeName, creationTime));
+ return Objects.isNull(handler) ? Collections.emptySet() :
handler.getExpectedDataNodeIdSet();
+ }
+
+ public TPipeTransferResp handle(CombineRequest combineRequest) {
+ final String pipeId =
+ generatePipeId(combineRequest.getPipeName(),
combineRequest.getCreationTime());
+
+ final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+ if (Objects.isNull(handler)) {
+ throw new PipeException("CombineHandler not found for pipeId = " +
pipeId);
+ }
+
+ return new TPipeTransferResp()
+ .setStatus(
+ handler.combine(
+ combineRequest.getRegionId(),
+ combineRequest.getCombineId(),
+ combineRequest.getState()));
+ }
+
+ public FetchCombineResultResponse handle(FetchCombineResultRequest
fetchCombineResultRequest)
+ throws IOException {
+ final String pipeId =
+ generatePipeId(
+ fetchCombineResultRequest.getPipeName(),
fetchCombineResultRequest.getCreationTime());
+
+ final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+ if (Objects.isNull(handler)) {
+ throw new PipeException("CombineHandler not found for pipeId = " +
pipeId);
+ }
+
+ return
handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList());
+ }
+
+ public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() {
+ final Map<String, PipeCombineHandler> pipeId2CombineHandlerSnapshot;
+ synchronized (this) {
+ pipeId2CombineHandlerSnapshot = new HashMap<>(pipeId2CombineHandler);
+ }
+
+ pipeId2CombineHandlerSnapshot.forEach(
+ (pipeId, handler) -> {
+ handler.fetchAndUpdateExpectedRegionId2DataNodeIdMap();
+ handler.cleanOutdatedCombiner();
+ });
+ }
+
+ private static String generatePipeId(String pipeName, long creationTime) {
+ return pipeName + "-" + creationTime;
+ }
+
+ /////////////////////////////// Singleton ///////////////////////////////
+
+ private PipeCombineHandlerManager() {
+ PipeAgent.runtime()
+ .registerPeriodicalJob(
+
"CombineHandlerManager#fetchExpectedRegionIdSetAndCleanOutdatedCombiner",
+ this::fetchExpectedRegionIdSetAndCleanOutdatedCombiner,
+
PipeConfig.getInstance().getTwoStageAggregateDataRegionInfoCacheTimeInMs() /
1000 / 2);
+ }
+
+ private static class CombineHandlerManagerHolder {
+ private static final PipeCombineHandlerManager INSTANCE = new
PipeCombineHandlerManager();
+ }
+
+ public static PipeCombineHandlerManager getInstance() {
+ return CombineHandlerManagerHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
new file mode 100644
index 00000000000..cb1ba0b9ad9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CombineRequest extends TPipeTransferReq {
+
+ private String pipeName;
+ private long creationTime;
+ private int regionId;
+ private String combineId;
+
+ private State state;
+
+ private CombineRequest() {
+ // Empty constructor
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public int getRegionId() {
+ return regionId;
+ }
+
+ public String getCombineId() {
+ return combineId;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public static CombineRequest toTPipeTransferReq(
+ String pipeName, long creationTime, int regionId, String combineId,
State state)
+ throws IOException {
+ return new CombineRequest()
+ .convertToTPipeTransferReq(pipeName, creationTime, regionId,
combineId, state);
+ }
+
+ public static CombineRequest fromTPipeTransferReq(TPipeTransferReq
transferReq) throws Exception {
+ return new CombineRequest().translateFromTPipeTransferReq(transferReq);
+ }
+
+ private CombineRequest convertToTPipeTransferReq(
+ String pipeName, long creationTime, int regionId, String combineId,
State state)
+ throws IOException {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ this.regionId = regionId;
+ this.state = state;
+ this.combineId = combineId;
+
+ this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion();
+ this.type = RequestType.COMBINE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
+ ReadWriteIOUtils.write(regionId, outputStream);
+ ReadWriteIOUtils.write(combineId, outputStream);
+
+ ReadWriteIOUtils.write(state.getClass().getName(), outputStream);
+ state.serialize(outputStream);
+
+ this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return this;
+ }
+
+ private CombineRequest translateFromTPipeTransferReq(TPipeTransferReq
transferReq)
+ throws Exception {
+ pipeName = ReadWriteIOUtils.readString(transferReq.body);
+ creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+ regionId = ReadWriteIOUtils.readInt(transferReq.body);
+ combineId = ReadWriteIOUtils.readString(transferReq.body);
+
+ final String stateClassName =
ReadWriteIOUtils.readString(transferReq.body);
+ state = (State) Class.forName(stateClassName).newInstance();
+ state.deserialize(transferReq.body);
+
+ version = transferReq.version;
+ type = transferReq.type;
+ body = transferReq.body;
+
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "CombineRequest{"
+ + "pipeName='"
+ + pipeName
+ + '\''
+ + ", creationTime="
+ + creationTime
+ + ", regionId="
+ + regionId
+ + ", combineId='"
+ + combineId
+ + '\''
+ + ", state="
+ + state
+ + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
new file mode 100644
index 00000000000..b20904a0e2b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FetchCombineResultRequest extends TPipeTransferReq {
+
+ private String pipeName;
+ private long creationTime;
+ private List<String> combineIdList;
+
+ private FetchCombineResultRequest() {
+ // Empty constructor
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public List<String> getCombineIdList() {
+ return combineIdList;
+ }
+
+ public static FetchCombineResultRequest toTPipeTransferReq(
+ String pipeName, long creationTime, List<String> combineIdList) throws
IOException {
+ return new FetchCombineResultRequest()
+ .convertToTPipeTransferReq(pipeName, creationTime, combineIdList);
+ }
+
+ public static FetchCombineResultRequest
fromTPipeTransferReq(TPipeTransferReq transferReq)
+ throws Exception {
+ return new
FetchCombineResultRequest().translateFromTPipeTransferReq(transferReq);
+ }
+
+ private FetchCombineResultRequest convertToTPipeTransferReq(
+ String pipeName, long creationTime, List<String> combineIdList) throws
IOException {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ this.combineIdList = combineIdList;
+
+ this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion();
+ this.type = RequestType.FETCH_COMBINE_RESULT.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
+
+ ReadWriteIOUtils.write(combineIdList.size(), outputStream);
+ for (String combineId : combineIdList) {
+ ReadWriteIOUtils.write(combineId, outputStream);
+ }
+
+ this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return this;
+ }
+
+ private FetchCombineResultRequest
translateFromTPipeTransferReq(TPipeTransferReq transferReq) {
+ pipeName = ReadWriteIOUtils.readString(transferReq.body);
+ creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+ combineIdList = new ArrayList<>();
+ final int combineIdListSize = ReadWriteIOUtils.readInt(transferReq.body);
+ for (int i = 0; i < combineIdListSize; i++) {
+ combineIdList.add(ReadWriteIOUtils.readString(transferReq.body));
+ }
+
+ version = transferReq.version;
+ type = transferReq.type;
+ body = transferReq.body;
+
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "FetchCombineResultRequest{"
+ + "pipeName='"
+ + pipeName
+ + '\''
+ + ", creationTime="
+ + creationTime
+ + ", combineIdList="
+ + combineIdList
+ + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
new file mode 100644
index 00000000000..2884eb50225
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FetchCombineResultResponse extends TPipeTransferResp {
+
+ public enum CombineResultType {
+ SUCCESS,
+ INCOMPLETE,
+ OUTDATED,
+ }
+
+ private Map<String, CombineResultType> combineId2ResultType = new
HashMap<>();
+
+ private FetchCombineResultResponse() {
+ // Empty constructor
+ }
+
+ public Map<String, CombineResultType> getCombineId2ResultType() {
+ return combineId2ResultType;
+ }
+
+ public static FetchCombineResultResponse toTPipeTransferResp(
+ Map<String, CombineResultType> combineId2ResultType) throws IOException {
+ final FetchCombineResultResponse response = new
FetchCombineResultResponse();
+
+ response.combineId2ResultType = combineId2ResultType;
+
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(combineId2ResultType.size(), outputStream);
+ for (Map.Entry<String, CombineResultType> entry :
combineId2ResultType.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue().ordinal(), outputStream);
+ }
+
+ response.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ response.status = RpcUtils.SUCCESS_STATUS;
+
+ return response;
+ }
+
+ public static FetchCombineResultResponse
fromTPipeTransferResp(TPipeTransferResp transferResp) {
+ final FetchCombineResultResponse response = new
FetchCombineResultResponse();
+
+ response.status = transferResp.status;
+ response.body = transferResp.body;
+
+ response.combineId2ResultType = new HashMap<>();
+ if (response.isSetBody()) {
+ final int size = ReadWriteIOUtils.readInt(transferResp.body);
+ for (int i = 0; i < size; i++) {
+ final String combineId =
ReadWriteIOUtils.readString(transferResp.body);
+ final CombineResultType resultType =
+
CombineResultType.values()[ReadWriteIOUtils.readInt(transferResp.body)];
+ response.combineId2ResultType.put(combineId, resultType);
+ }
+ }
+
+ return response;
+ }
+
+ @Override
+ public String toString() {
+ return "FetchCombineResultResponse{" + "combineId2ResultType=" +
combineId2ResultType + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
similarity index 50%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
index 9f853dd403e..99f2b16cbb3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
@@ -17,19 +17,39 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
-public enum IoTDBConnectorRequestVersion {
- VERSION_1((byte) 1),
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum RequestType {
+ COMBINE((short) 0),
+ FETCH_COMBINE_RESULT((short) 1),
;
- private final byte version;
+ private final short type;
+
+ RequestType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ private static final Map<Short, RequestType> TYPE_MAP =
+ Arrays.stream(RequestType.values())
+ .collect(
+ HashMap::new,
+ (typeMap, pipeRequestType) ->
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+ HashMap::putAll);
- IoTDBConnectorRequestVersion(byte type) {
- this.version = type;
+ public static boolean isValidatedRequestType(short type) {
+ return TYPE_MAP.containsKey(type);
}
- public byte getVersion() {
- return version;
+ public static RequestType valueOf(short type) {
+ return TYPE_MAP.get(type);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
new file mode 100644
index 00000000000..1e029d7a442
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver;
+
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
+import
org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.RequestType;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwoStageAggregateReceiver implements IoTDBReceiver {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TwoStageAggregateReceiver.class);
+
+ @Override
+ public IoTDBConnectorRequestVersion getVersion() {
+ return IoTDBConnectorRequestVersion.VERSION_2;
+ }
+
+ @Override
+ public TPipeTransferResp receive(TPipeTransferReq req) {
+ try {
+ final short rawRequestType = req.getType();
+ if (RequestType.isValidatedRequestType(rawRequestType)) {
+ switch (RequestType.valueOf(rawRequestType)) {
+ case COMBINE:
+ return PipeCombineHandlerManager.getInstance()
+ .handle(CombineRequest.fromTPipeTransferReq(req));
+ case FETCH_COMBINE_RESULT:
+ return PipeCombineHandlerManager.getInstance()
+ .handle(FetchCombineResultRequest.fromTPipeTransferReq(req));
+ default:
+ break;
+ }
+ }
+
+ LOGGER.warn("Unknown request type {}: {}.", rawRequestType, req);
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TYPE_ERROR,
+ String.format("Unknown request type %s.", rawRequestType)));
+ } catch (Exception e) {
+ LOGGER.warn("Error occurs when receiving request: {}.", req, e);
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_ERROR,
+ String.format("Error occurs when receiving request: %s.",
e.getMessage())));
+ }
+ }
+
+ @Override
+ public void handleExit() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Two stage aggregate receiver is exiting.");
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
new file mode 100644
index 00000000000..90c6a2f860a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import
org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TwoStageAggregateSender implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TwoStageAggregateSender.class);
+
+ private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+ private final String pipeName;
+ private final long creationTime;
+
+ private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME =
new AtomicLong(0);
+ private static final AtomicReference<Map<Integer, TEndPoint>>
DATANODE_ID_2_END_POINTS =
+ new AtomicReference<>();
+
+ private TEndPoint[] endPoints;
+ private final Map<TEndPoint, IoTDBSyncClient> endPointIoTDBSyncClientMap =
+ new ConcurrentHashMap<>();
+
+ public TwoStageAggregateSender(String pipeName, long creationTime) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ }
+
+ public synchronized TPipeTransferResp request(long watermark,
TPipeTransferReq req)
+ throws TException {
+ final boolean endPointsChanged = tryFetchEndPointsIfNecessary();
+ tryConstructClients(endPointsChanged);
+
+ final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
+ IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint);
+ if (client == null) {
+ client = reconstructIoTDBSyncClient(endPoint);
+ }
+
+ LOGGER.info("Sending request {} (watermark = {}) to {}", req, watermark,
endPoint);
+
+ try {
+ return client.pipeTransfer(req);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to send request {} (watermark = {}) to {}", req,
watermark, endPoint, e);
+ try {
+ reconstructIoTDBSyncClient(endPoint);
+ } catch (Exception ex) {
+ LOGGER.warn(
+ "Failed to reconstruct IoTDBSyncClient {} after failure to send
request {} (watermark = {})",
+ endPoint,
+ req,
+ watermark,
+ ex);
+ }
+ throw e;
+ }
+ }
+
+ private static boolean tryFetchEndPointsIfNecessary() {
+ final long currentTime = System.currentTimeMillis();
+
+ if (DATANODE_ID_2_END_POINTS.get() != null
+ && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
+ < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
+ return false;
+ }
+
+ synchronized (DATANODE_ID_2_END_POINTS) {
+ if (DATANODE_ID_2_END_POINTS.get() != null
+ && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
+ < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
+ return false;
+ }
+
+ final Map<Integer, TEndPoint> dataNodeId2EndPointMap = new HashMap<>();
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ final TShowDataNodesResp showDataNodesResp =
configNodeClient.showDataNodes();
+ if (showDataNodesResp == null ||
showDataNodesResp.getDataNodesInfoList() == null) {
+ throw new PipeException("Failed to fetch data nodes");
+ }
+ for (final TDataNodeInfo dataNodeInfo :
showDataNodesResp.getDataNodesInfoList()) {
+ dataNodeId2EndPointMap.put(
+ dataNodeInfo.getDataNodeId(),
+ new TEndPoint(dataNodeInfo.getRpcAddresss(),
dataNodeInfo.getRpcPort()));
+ }
+ } catch (ClientManagerException | TException e) {
+ throw new PipeException("Failed to fetch data nodes", e);
+ }
+
+ if (dataNodeId2EndPointMap.isEmpty()) {
+ throw new PipeException("No data nodes' endpoints fetched");
+ }
+
+ DATANODE_ID_2_END_POINTS.set(dataNodeId2EndPointMap);
+ DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.set(currentTime);
+ }
+
+ LOGGER.info("Data nodes' endpoints for two-stage aggregation: {}",
DATANODE_ID_2_END_POINTS);
+ return true;
+ }
+
+ private void tryConstructClients(boolean endPointsChanged) {
+ if (Objects.nonNull(endPoints) && !endPointsChanged) {
+ return;
+ }
+
+ final Set<Integer> expectedDataNodeIdSet =
+
PipeCombineHandlerManager.getInstance().getExpectedDataNodeIdSet(pipeName,
creationTime);
+ if (expectedDataNodeIdSet.isEmpty()) {
+ throw new PipeException("No expected region id set fetched");
+ }
+
+ endPoints =
+ DATANODE_ID_2_END_POINTS.get().entrySet().stream()
+ .filter(entry -> expectedDataNodeIdSet.contains(entry.getKey()))
+ .map(Map.Entry::getValue)
+ .toArray(TEndPoint[]::new);
+ LOGGER.info(
+ "End points for two-stage aggregation pipe (pipeName={},
creationTime={}) were updated to {}",
+ pipeName,
+ creationTime,
+ endPoints);
+
+ for (final TEndPoint endPoint : endPoints) {
+ if (endPointIoTDBSyncClientMap.containsKey(endPoint)) {
+ continue;
+ }
+
+ try {
+ endPointIoTDBSyncClientMap.put(endPoint,
constructIoTDBSyncClient(endPoint));
+ } catch (TTransportException e) {
+ LOGGER.warn("Failed to construct IoTDBSyncClient", e);
+ }
+ }
+
+ for (final TEndPoint endPoint : new
HashSet<>(endPointIoTDBSyncClientMap.keySet())) {
+ if (!DATANODE_ID_2_END_POINTS.get().containsValue(endPoint)) {
+ try {
+ endPointIoTDBSyncClientMap.remove(endPoint).close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close IoTDBSyncClient", e);
+ }
+ }
+ }
+ }
+
+ private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
+ throws TTransportException {
+ final IoTDBSyncClient oldClient =
endPointIoTDBSyncClientMap.remove(endPoint);
+ if (oldClient != null) {
+ try {
+ oldClient.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close old IoTDBSyncClient", e);
+ }
+ }
+ final IoTDBSyncClient newClient = constructIoTDBSyncClient(endPoint);
+ endPointIoTDBSyncClientMap.put(endPoint, newClient);
+ return newClient;
+ }
+
+ private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws
TTransportException {
+ return new IoTDBSyncClient(
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs((int)
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+ PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+ .build(),
+ endPoint.getIp(),
+ endPoint.getPort(),
+ false,
+ null,
+ null);
+ }
+
+ @Override
+ public void close() {
+ for (final IoTDBSyncClient client : endPointIoTDBSyncClientMap.values()) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close IoTDBSyncClient", e);
+ }
+ }
+
+ endPointIoTDBSyncClientMap.clear();
+ endPoints = null;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
new file mode 100644
index 00000000000..a4eae9d0aa5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
+
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Queue;
+
+public class CountOperator implements Operator {
+
+ private final long onCompletionTimestamp;
+ private long globalCount;
+
+ private final Queue<Pair<Long, Long>> globalCountQueue;
+
+ public CountOperator(String combineId, Queue<Pair<Long, Long>>
globalCountQueue) {
+ onCompletionTimestamp = Long.parseLong(combineId);
+ globalCount = 0;
+
+ this.globalCountQueue = globalCountQueue;
+ }
+
+ @Override
+ public void combine(State state) {
+ globalCount += ((CountState) state).getCount();
+ }
+
+ @Override
+ public void onComplete() {
+ globalCountQueue.add(new Pair<>(onCompletionTimestamp, globalCount));
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
similarity index 73%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
index 9f853dd403e..885c565442b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
@@ -17,19 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
-public enum IoTDBConnectorRequestVersion {
- VERSION_1((byte) 1),
- ;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
- private final byte version;
+public interface Operator {
- IoTDBConnectorRequestVersion(byte type) {
- this.version = type;
- }
+ void combine(State state);
- public byte getVersion() {
- return version;
- }
+ void onComplete();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
new file mode 100644
index 00000000000..cbc14d976dc
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.plugin;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+import
org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.sender.TwoStageAggregateSender;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.CountOperator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TwoStageCountProcessor implements PipeProcessor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TwoStageCountProcessor.class);
+
+ private String pipeName;
+ private long creationTime;
+ private int regionId;
+ private PipeTaskMeta pipeTaskMeta;
+
+ private PartialPath outputSeries;
+
+ private static final String LOCAL_COUNT_STATE_KEY = "count";
+ private final AtomicLong localCount = new AtomicLong(0);
+ private final AtomicReference<ProgressIndex> localCommitProgressIndex =
+ new AtomicReference<>(MinimumProgressIndex.INSTANCE);
+
+ private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local
count], progress index) */>
+ localRequestQueue = new ConcurrentLinkedQueue<>();
+ private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local
count], progress index) */>
+ localCommitQueue = new ConcurrentLinkedQueue<>();
+
+ private TwoStageAggregateSender twoStageAggregateSender;
+ private final Queue<Pair<Long, Long> /* (timestamp, global count) */>
globalCountQueue =
+ new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+
validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+
+ final String rawOutputSeries =
+
validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+ try {
+ PathUtils.isLegalPath(rawOutputSeries);
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Illegal output series path: " +
rawOutputSeries);
+ }
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeProcessorRuntimeConfiguration configuration)
+ throws Exception {
+ final PipeTaskProcessorRuntimeEnvironment runtimeEnvironment =
+ (PipeTaskProcessorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ pipeName = runtimeEnvironment.getPipeName();
+ creationTime = runtimeEnvironment.getCreationTime();
+ regionId = runtimeEnvironment.getRegionId();
+ pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
+
+ outputSeries =
+ new
PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY));
+
+ if (Objects.nonNull(pipeTaskMeta) &&
Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
+ if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
+ pipeTaskMeta.updateProgressIndex(
+ new StateProgressIndex(Long.MIN_VALUE, new HashMap<>(),
MinimumProgressIndex.INSTANCE));
+ }
+
+ final StateProgressIndex stateProgressIndex =
+ (StateProgressIndex) pipeTaskMeta.getProgressIndex();
+ localCommitProgressIndex.set(stateProgressIndex.getInnerProgressIndex());
+ final Binary localCountState =
stateProgressIndex.getState().get(LOCAL_COUNT_STATE_KEY);
+ localCount.set(
+ Objects.isNull(localCountState) ? 0 :
Long.parseLong(localCountState.toString()));
+ }
+ LOGGER.info(
+ "TwoStageCountProcessor customized by thread {}: pipeName={},
creationTime={}, regionId={}, outputSeries={}, "
+ + "localCommitProgressIndex={}, localCount={}",
+ Thread.currentThread().getName(),
+ pipeName,
+ creationTime,
+ regionId,
+ outputSeries,
+ localCommitProgressIndex.get(),
+ localCount.get());
+
+ PipeCombineHandlerManager.getInstance()
+ .register(
+ pipeName, creationTime, (combineId) -> new
CountOperator(combineId, globalCountQueue));
+ twoStageAggregateSender = new TwoStageAggregateSender(pipeName,
creationTime);
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ LOGGER.warn(
+ "Ignored TabletInsertionEvent is not an instance of
PipeInsertNodeTabletInsertionEvent or PipeRawTabletInsertionEvent: {}",
+ tabletInsertionEvent);
+ return;
+ }
+
+ final EnrichedEvent event = (EnrichedEvent) tabletInsertionEvent;
+ event.skipReportOnCommit();
+
+ final long count =
+ (event instanceof PipeInsertNodeTabletInsertionEvent)
+ ? ((PipeInsertNodeTabletInsertionEvent) event).count()
+ : ((PipeRawTabletInsertionEvent) event).count();
+ localCount.accumulateAndGet(count, Long::sum);
+
+ localCommitProgressIndex.set(
+ localCommitProgressIndex
+ .get()
+
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ }
+
+ @Override
+ public void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ LOGGER.warn(
+ "Ignored TsFileInsertionEvent is not an instance of
PipeTsFileInsertionEvent: {}",
+ tsFileInsertionEvent);
+ return;
+ }
+
+ final PipeTsFileInsertionEvent event = (PipeTsFileInsertionEvent)
tsFileInsertionEvent;
+ event.skipReportOnCommit();
+
+ if (!event.waitForTsFileClose()) {
+ LOGGER.warn("Ignored TsFileInsertionEvent is empty: {}", event);
+ return;
+ }
+
+ final long count = event.count(true);
+ localCount.accumulateAndGet(count, Long::sum);
+
+ localCommitProgressIndex.set(
+ localCommitProgressIndex
+ .get()
+
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ }
+
+ @Override
+ public void process(Event event, EventCollector eventCollector) throws
Exception {
+ if (event instanceof PipeHeartbeatEvent) {
+ collectGlobalCountIfNecessary(eventCollector);
+ commitLocalProgressIndexIfNecessary();
+ triggerCombineIfNecessary();
+ }
+
+ if (event instanceof PipeWatermarkEvent) {
+ triggerCombine(
+ new Pair<>(
+ new long[] {((PipeWatermarkEvent) event).getWatermark(),
localCount.get()},
+ localCommitProgressIndex.get()));
+ }
+ }
+
+ private void collectGlobalCountIfNecessary(EventCollector eventCollector)
throws IOException {
+ while (!globalCountQueue.isEmpty()) {
+ final Object lastCombinedValue =
+
PipeCombineHandlerManager.getInstance().getLastCombinedValue(pipeName,
creationTime);
+ final Pair<Long, Long> lastCollectedTimestampCountPair =
+ Objects.isNull(lastCombinedValue)
+ ? new Pair<>(Long.MIN_VALUE, 0L)
+ : (Pair<Long, Long>) lastCombinedValue;
+
+ final Pair<Long, Long> timestampCountPair = globalCountQueue.poll();
+ if (timestampCountPair.right < lastCollectedTimestampCountPair.right) {
+ timestampCountPair.right = lastCollectedTimestampCountPair.right;
+ LOGGER.warn(
+ "Global count is less than the last collected count: timestamp={},
count={}",
+ timestampCountPair.left,
+ timestampCountPair.right);
+ }
+
+ final Tablet tablet =
+ new Tablet(
+ outputSeries.getDevice(),
+ Collections.singletonList(
+ new MeasurementSchema(outputSeries.getMeasurement(),
TSDataType.INT64)),
+ 1);
+ tablet.rowSize = 1;
+ tablet.addTimestamp(0, timestampCountPair.left);
+ tablet.addValue(outputSeries.getMeasurement(), 0,
timestampCountPair.right);
+
+ eventCollector.collect(
+ new PipeRawTabletInsertionEvent(tablet, false, null, null, null,
false));
+
+ PipeCombineHandlerManager.getInstance()
+ .updateLastCombinedValue(pipeName, creationTime, timestampCountPair);
+ }
+ }
+
+ private void commitLocalProgressIndexIfNecessary() {
+ final int currentQueueSize = localCommitQueue.size();
+ for (int i = 0; i < currentQueueSize; i++) {
+ final Pair<long[], ProgressIndex> pair = localCommitQueue.poll();
+ if (Objects.isNull(pair)) {
+ break;
+ }
+
+ try {
+ // TODO: optimize the combine result fetching with batch fetching
+ final FetchCombineResultResponse fetchCombineResultResponse =
+ FetchCombineResultResponse.fromTPipeTransferResp(
+ twoStageAggregateSender.request(
+ pair.left[0],
+ FetchCombineResultRequest.toTPipeTransferReq(
+ pipeName,
+ creationTime,
+
Collections.singletonList(Long.toString(pair.left[0])))));
+
+ if (fetchCombineResultResponse.getStatus().getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ "Failed to fetch combine result: "
+ + fetchCombineResultResponse.getStatus().getMessage());
+ }
+
+ for (final Map.Entry<String,
FetchCombineResultResponse.CombineResultType> entry :
+ fetchCombineResultResponse.getCombineId2ResultType().entrySet()) {
+ final String combineId = entry.getKey();
+ final FetchCombineResultResponse.CombineResultType resultType =
entry.getValue();
+
+ switch (resultType) {
+ case OUTDATED:
+ LOGGER.warn(
+ "Two stage combine (region id = {}, combine id = {})
outdated: timestamp={}, count={}, progressIndex={}",
+ regionId,
+ combineId,
+ pair.left[0],
+ pair.left[1],
+ pair.right);
+ continue;
+ case INCOMPLETE:
+ LOGGER.info(
+ "Two stage combine (region id = {}, combine id = {})
incomplete: timestamp={}, count={}, progressIndex={}",
+ regionId,
+ combineId,
+ pair.left[0],
+ pair.left[1],
+ pair.right);
+ localCommitQueue.add(pair);
+ continue;
+ case SUCCESS:
+ final Map<String, Binary> state = new HashMap<>();
+ state.put(LOCAL_COUNT_STATE_KEY, new
Binary(Long.toString(pair.left[1]).getBytes()));
+ pipeTaskMeta.updateProgressIndex(
+ new StateProgressIndex(pair.left[0], state, pair.right));
+ LOGGER.info(
+ "Two stage combine (region id = {}, combine id = {})
success: timestamp={}, count={}, progressIndex={}, committed progressIndex={}",
+ regionId,
+ combineId,
+ pair.left[0],
+ pair.left[1],
+ pair.right,
+ pipeTaskMeta.getProgressIndex());
+ continue;
+ default:
+ throw new PipeException("Unknown combine result type: " +
resultType);
+ }
+ }
+ } catch (Exception e) {
+ localCommitQueue.add(pair);
+ LOGGER.warn(
+ "Failure occurred when trying to commit progress index.
timestamp={}, count={}, progressIndex={}",
+ pair.left[0],
+ pair.left[1],
+ pair.right,
+ e);
+ return;
+ }
+ }
+ }
+
+ private void triggerCombineIfNecessary() {
+ while (!localRequestQueue.isEmpty()) {
+ if (!triggerCombine(localRequestQueue.poll())) {
+ return;
+ }
+ }
+ }
+
+ private boolean triggerCombine(Pair<long[], ProgressIndex> pair) {
+ final long watermark = pair.getLeft()[0];
+ final long count = pair.getLeft()[1];
+ final ProgressIndex progressIndex = pair.getRight();
+ try {
+ final TPipeTransferResp resp =
+ twoStageAggregateSender.request(
+ watermark,
+ CombineRequest.toTPipeTransferReq(
+ pipeName,
+ creationTime,
+ regionId,
+ Long.toString(watermark),
+ new CountState(count)));
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException("Failed to combine count: " +
resp.getStatus().getMessage());
+ }
+ localCommitQueue.add(pair);
+ return true;
+ } catch (Exception e) {
+ localRequestQueue.add(pair);
+ LOGGER.warn(
+ "Failed to trigger combine. watermark={}, count={},
progressIndex={}",
+ watermark,
+ count,
+ progressIndex,
+ e);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (Objects.nonNull(twoStageAggregateSender)) {
+ twoStageAggregateSender.close();
+ }
+ PipeCombineHandlerManager.getInstance().deregister(pipeName, creationTime);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
similarity index 55%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
index 9f853dd403e..0370131e902 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
@@ -17,19 +17,37 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.state;
-public enum IoTDBConnectorRequestVersion {
- VERSION_1((byte) 1),
- ;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
- private final byte version;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
- IoTDBConnectorRequestVersion(byte type) {
- this.version = type;
+public class CountState implements State {
+
+ private long count;
+
+ public CountState() {
+ // For reflection
+ }
+
+ public CountState(long count) {
+ this.count = count;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(count, outputStream);
}
- public byte getVersion() {
- return version;
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ count = ReadWriteIOUtils.readLong(byteBuffer);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
similarity index 73%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
index 9f853dd403e..36d393e9ffa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
@@ -17,19 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.state;
-public enum IoTDBConnectorRequestVersion {
- VERSION_1((byte) 1),
- ;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
- private final byte version;
+public interface State {
- IoTDBConnectorRequestVersion(byte type) {
- this.version = type;
- }
+ void serialize(OutputStream outputStream) throws IOException;
- public byte getVersion() {
- return version;
- }
+ void deserialize(ByteBuffer byteBuffer);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index ea8a1efc618..d8b4c12c733 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -201,8 +201,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
receiverId.get(),
status);
return new TPipeTransferResp(status);
- } catch (final IOException e) {
- final String error = String.format("Serialization error during pipe
receiving, %s", e);
+ } catch (Exception e) {
+ final String error =
+ String.format("Exception %s encountered while handling request %s.",
e.getMessage(), req);
LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR,
error));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index 932123a4260..e0016eceac2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver.TwoStageAggregateReceiver;
public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent {
@@ -31,6 +32,8 @@ public class IoTDBDataNodeReceiverAgent extends
IoTDBReceiverAgent {
protected void initConstructors() {
RECEIVER_CONSTRUCTORS.put(
IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
IoTDBDataNodeReceiver::new);
+ RECEIVER_CONSTRUCTORS.put(
+ IoTDBConnectorRequestVersion.VERSION_2.getVersion(),
TwoStageAggregateReceiver::new);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5057ea05b98..4e4e76ea192 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -229,6 +229,10 @@ public class CommonConfig {
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
+ private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000; // 8
minutes
+ private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000;
// 3 minutes
+ private long twoStageAggregateSenderEndPointsCacheInMs = 3 * 60 * 1000; // 3
minutes
+
private int subscriptionSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
private int subscriptionMaxTabletsPerPrefetching = 16;
@@ -970,6 +974,34 @@ public class CommonConfig {
this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
}
+ public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
+ return twoStageAggregateMaxCombinerLiveTimeInMs;
+ }
+
+ public void setTwoStageAggregateMaxCombinerLiveTimeInMs(
+ long twoStageAggregateMaxCombinerLiveTimeInMs) {
+ this.twoStageAggregateMaxCombinerLiveTimeInMs =
twoStageAggregateMaxCombinerLiveTimeInMs;
+ }
+
+ public long getTwoStageAggregateDataRegionInfoCacheTimeInMs() {
+ return twoStageAggregateDataRegionInfoCacheTimeInMs;
+ }
+
+ public void setTwoStageAggregateDataRegionInfoCacheTimeInMs(
+ long twoStageAggregateDataRegionInfoCacheTimeInMs) {
+ this.twoStageAggregateDataRegionInfoCacheTimeInMs =
+ twoStageAggregateDataRegionInfoCacheTimeInMs;
+ }
+
+ public long getTwoStageAggregateSenderEndPointsCacheInMs() {
+ return twoStageAggregateSenderEndPointsCacheInMs;
+ }
+
+ public void setTwoStageAggregateSenderEndPointsCacheInMs(
+ long twoStageAggregateSenderEndPointsCacheInMs) {
+ this.twoStageAggregateSenderEndPointsCacheInMs =
twoStageAggregateSenderEndPointsCacheInMs;
+ }
+
public int getSubscriptionSubtaskExecutorMaxThreadNum() {
return subscriptionSubtaskExecutorMaxThreadNum;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f64a79e36f4..04bd5f141d7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -516,11 +516,28 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_listening_queue_transfer_snapshot_threshold",
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
+
config.setPipeSnapshotExecutionMaxBatchSize(
Integer.parseInt(
properties.getProperty(
"pipe_snapshot_execution_max_batch_size",
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
+
+ config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "two_stage_aggregate_max_combiner_live_time_in_ms",
+
String.valueOf(config.getTwoStageAggregateMaxCombinerLiveTimeInMs()))));
+ config.setTwoStageAggregateDataRegionInfoCacheTimeInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "two_stage_aggregate_data_region_info_cache_time_in_ms",
+
String.valueOf(config.getTwoStageAggregateDataRegionInfoCacheTimeInMs()))));
+ config.setTwoStageAggregateSenderEndPointsCacheInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "two_stage_aggregate_sender_end_points_cache_in_ms",
+
String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs()))));
}
private void loadSubscriptionProps(Properties properties) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 494d42a7c93..ad82065a363 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.consensus.index;
import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import com.google.common.collect.ImmutableList;
@@ -169,8 +170,10 @@ public abstract class ProgressIndex {
return progressIndex1; // progressIndex1 is not null
}
- return new HybridProgressIndex(progressIndex1)
- .updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
+ return progressIndex1 instanceof StateProgressIndex
+ ?
progressIndex1.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2)
+ : new HybridProgressIndex(progressIndex1)
+ .updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
}
/**
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index c5c2ed66345..58548e18c4b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -42,7 +43,7 @@ public enum ProgressIndexType {
HYBRID_PROGRESS_INDEX((short) 5),
META_PROGRESS_INDEX((short) 6),
TIME_WINDOW_STATE_PROGRESS_INDEX((short) 7),
- ;
+ STATE_PROGRESS_INDEX((short) 8);
private final short type;
@@ -79,6 +80,8 @@ public enum ProgressIndexType {
return MetaProgressIndex.deserializeFrom(byteBuffer);
case 7:
return TimeWindowStateProgressIndex.deserializeFrom(byteBuffer);
+ case 8:
+ return StateProgressIndex.deserializeFrom(byteBuffer);
default:
throw new UnsupportedOperationException(
String.format("Unsupported progress index type %s.", indexType));
@@ -102,6 +105,8 @@ public enum ProgressIndexType {
return MetaProgressIndex.deserializeFrom(stream);
case 7:
return TimeWindowStateProgressIndex.deserializeFrom(stream);
+ case 8:
+ return StateProgressIndex.deserializeFrom(stream);
default:
throw new UnsupportedOperationException(
String.format("Unsupported progress index type %s.", indexType));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index c3955ce2811..8e4541e4038 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -173,6 +173,10 @@ public class HybridProgressIndex extends ProgressIndex {
return this;
}
+ if (progressIndex instanceof StateProgressIndex) {
+ return progressIndex.updateToMinimumEqualOrIsAfterProgressIndex(this);
+ }
+
if (!(progressIndex instanceof HybridProgressIndex)) {
type2Index.compute(
progressIndex.getType().getType(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
index 8fb6c268092..679eecbf0eb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
@@ -138,7 +138,7 @@ public class MetaProgressIndex extends ProgressIndex {
lock.writeLock().lock();
try {
if (!(progressIndex instanceof MetaProgressIndex)) {
- return this;
+ return ProgressIndex.blendProgressIndex(this, progressIndex);
}
this.index = Math.max(this.index, ((MetaProgressIndex)
progressIndex).index);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
new file mode 100644
index 00000000000..8b44edfe07d
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StateProgressIndex extends ProgressIndex {
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private long version;
+ private Map<String, Binary> state;
+ private ProgressIndex innerProgressIndex;
+
+ public StateProgressIndex(
+ long version, Map<String, Binary> state, ProgressIndex
innerProgressIndex) {
+ this.version = version;
+ this.state = state;
+ this.innerProgressIndex = innerProgressIndex;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ public ProgressIndex getInnerProgressIndex() {
+ return innerProgressIndex == null ? MinimumProgressIndex.INSTANCE :
innerProgressIndex;
+ }
+
+ public Map<String, Binary> getState() {
+ return state;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.STATE_PROGRESS_INDEX.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(version, byteBuffer);
+
+ ReadWriteIOUtils.write(state.size(), byteBuffer);
+ for (final Map.Entry<String, Binary> entry : state.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+
+ innerProgressIndex.serialize(byteBuffer);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.STATE_PROGRESS_INDEX.serialize(stream);
+
+ ReadWriteIOUtils.write(version, stream);
+
+ ReadWriteIOUtils.write(state.size(), stream);
+ for (final Map.Entry<String, Binary> entry : state.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+
+ innerProgressIndex.serialize(stream);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return innerProgressIndex.isAfter(progressIndex);
+ }
+
+ if (progressIndex instanceof HybridProgressIndex) {
+ return ((HybridProgressIndex) progressIndex)
+ .isGivenProgressIndexAfterSelf(innerProgressIndex);
+ }
+
+ if (!(progressIndex instanceof StateProgressIndex)) {
+ return false;
+ }
+
+ return innerProgressIndex.isAfter(((StateProgressIndex)
progressIndex).innerProgressIndex)
+ && version > ((StateProgressIndex) progressIndex).version;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ return progressIndex instanceof StateProgressIndex
+ && innerProgressIndex.equals(((StateProgressIndex)
progressIndex).innerProgressIndex)
+ && version == ((StateProgressIndex) progressIndex).version;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof StateProgressIndex)) {
+ return false;
+ }
+ return this.equals((StateProgressIndex) obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(innerProgressIndex, version);
+ }
+
+ @Override
+ public ProgressIndex
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
+ lock.writeLock().lock();
+ try {
+ innerProgressIndex =
+ innerProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
+ progressIndex instanceof StateProgressIndex
+ ? ((StateProgressIndex) progressIndex).innerProgressIndex
+ : progressIndex);
+ if (progressIndex instanceof StateProgressIndex
+ && version <= ((StateProgressIndex) progressIndex).version) {
+ version = ((StateProgressIndex) progressIndex).version;
+ state = ((StateProgressIndex) progressIndex).state;
+ }
+ return this;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public ProgressIndexType getType() {
+ return ProgressIndexType.STATE_PROGRESS_INDEX;
+ }
+
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ return innerProgressIndex.getTotalOrderSumTuple();
+ }
+
+ public static StateProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ final long version = ReadWriteIOUtils.readLong(byteBuffer);
+
+ final Map<String, Binary> state = new HashMap<>();
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(byteBuffer);
+ final Binary value = ReadWriteIOUtils.readBinary(byteBuffer);
+ state.put(key, value);
+ }
+
+ final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(byteBuffer);
+
+ return new StateProgressIndex(version, state, progressIndex);
+ }
+
+ public static StateProgressIndex deserializeFrom(InputStream stream) throws
IOException {
+ final long version = ReadWriteIOUtils.readLong(stream);
+
+ final Map<String, Binary> state = new HashMap<>();
+ final int size = ReadWriteIOUtils.readInt(stream);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(stream);
+ final Binary value = ReadWriteIOUtils.readBinary(stream);
+ state.put(key, value);
+ }
+
+ final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(stream);
+
+ return new StateProgressIndex(version, state, progressIndex);
+ }
+
+ @Override
+ public String toString() {
+ return "StateProgressIndex{"
+ + "version="
+ + version
+ + ", state="
+ + state
+ + ", innerProgressIndex="
+ + innerProgressIndex
+ + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 6fab25c9b70..7a0391d1dd8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -245,6 +245,20 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMemoryExpanderIntervalSeconds();
}
+ /////////////////////////////// TwoStage ///////////////////////////////
+
+ public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
+ return COMMON_CONFIG.getTwoStageAggregateMaxCombinerLiveTimeInMs();
+ }
+
+ public long getTwoStageAggregateDataRegionInfoCacheTimeInMs() {
+ return COMMON_CONFIG.getTwoStageAggregateDataRegionInfoCacheTimeInMs();
+ }
+
+ public long getTwoStageAggregateSenderEndPointsCacheInMs() {
+ return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
@@ -337,6 +351,17 @@ public class PipeConfig {
LOGGER.info(
"PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
+ LOGGER.info("PipeMemoryExpanderIntervalSeconds: {}",
getPipeMemoryExpanderIntervalSeconds());
+
+ LOGGER.info(
+ "TwoStageAggregateMaxCombinerLiveTimeInMs: {}",
+ getTwoStageAggregateMaxCombinerLiveTimeInMs());
+ LOGGER.info(
+ "TwoStageAggregateDataRegionInfoCacheTimeInMs: {}",
+ getTwoStageAggregateDataRegionInfoCacheTimeInMs());
+ LOGGER.info(
+ "TwoStageAggregateSenderEndPointsCacheInMs: {}",
+ getTwoStageAggregateSenderEndPointsCacheInMs());
}
/////////////////////////////// Singleton ///////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 80c102a0b3e..05ed098976f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -79,6 +79,10 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time";
public static final String SOURCE_END_TIME_KEY = "source.end-time";
+ public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY =
"extractor.watermark-interval-ms";
+ public static final String SOURCE_WATERMARK_INTERVAL_KEY =
"source.watermark-interval-ms";
+ public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; //
-1 means no watermark
+
private PipeExtractorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index 719fa3c37e5..49fa2e2cab8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -66,6 +66,8 @@ public class PipeProcessorConstant {
"processor.sdt.max-time-interval";
public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE =
Long.MAX_VALUE;
+ public static final String PROCESSOR_OUTPUT_SERIES_KEY =
"processor.output-series";
+
private PipeProcessorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
index 9f853dd403e..74219ef6b29 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
public enum IoTDBConnectorRequestVersion {
VERSION_1((byte) 1),
+ VERSION_2((byte) 2),
;
private final byte version;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index a7fc2da5946..511282ff329 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothin
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.twostage.TwoStageCountProcessor;
import java.util.Arrays;
import java.util.Collections;
@@ -60,6 +61,7 @@ public enum BuiltinPipePlugin {
SDT_SAMPLING_PROCESSOR("sdt-sampling-processor",
SwingingDoorTrendingSamplingProcessor.class),
THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor",
ThrowingExceptionProcessor.class),
AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
+ COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
// Hidden-processors, which are plugins of the processors
STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor",
StandardStatisticsProcessor.class),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java
similarity index 59%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java
index 9f853dd403e..b8bf5773cda 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java
@@ -17,19 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor.twostage;
-public enum IoTDBConnectorRequestVersion {
- VERSION_1((byte) 1),
- ;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
- private final byte version;
-
- IoTDBConnectorRequestVersion(byte type) {
- this.version = type;
- }
-
- public byte getVersion() {
- return version;
- }
-}
+/**
+ * This class is a placeholder and should not be initialized. It represents the
+ * TwoStageCountProcessor. There is a real implementation in the server module
but cannot be
+ * imported here. The pipe agent in the server module will replace this class
with the real
+ * implementation when initializing the TwoStageCountProcessor.
+ */
+public class TwoStageCountProcessor extends PlaceHolderProcessor {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 91b8cbedbe8..c6c7f12e2e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -99,7 +99,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Receiver id = {}: Original receiver file dir {} was deleted.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to delete original receiver file dir
{}, because {}.",
receiverId.get(),
@@ -278,7 +278,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
try {
return PipeTransferFilePieceResp.toTPipeTransferResp(
status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
- } catch (IOException ex) {
+ } catch (Exception ex) {
return PipeTransferFilePieceResp.toTPipeTransferResp(status);
}
}
@@ -340,7 +340,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Receiver id = {}: Current writing file writer {} was closed.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to close current writing file writer {},
because {}.",
receiverId.get(),
@@ -377,7 +377,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Receiver id = {}: Original writing file {} was deleted.",
receiverId.get(),
file.getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to delete original writing file {},
because {}.",
receiverId.get(),
@@ -454,7 +454,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
status.getMessage());
}
return new TPipeTransferResp(status);
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(),
@@ -534,7 +534,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
status);
}
return new TPipeTransferResp(status);
- } catch (IOException | IllegalPathException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(), files, req, e);
return new TPipeTransferResp(
@@ -701,7 +701,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Receiver id = {}: Handling exit: Original receiver file dir {}
was deleted.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Handling exit: Delete original receiver file
dir {} error.",
receiverId.get(),