This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch global-counter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 15c01584c5f26e7deae07b5835cc10368fe6d0f8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Apr 11 22:31:19 2024 +0800

    1st
---
 .../agent/receiver/PipeDataNodeReceiverAgent.java  |   5 -
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |   9 ++
 .../processor/twostage/TwoStageCountProcessor.java |  55 ++++++++
 .../pipe/processor/twostage/combiner/Combiner.java |  72 +++++++++++
 .../twostage/combiner/PipeCombineHandler.java      | 107 ++++++++++++++++
 .../combiner/PipeCombineHandlerManager.java        | 138 +++++++++++++++++++++
 .../twostage/exchange/payload/CombineRequest.java  | 119 ++++++++++++++++++
 .../payload/FetchCombineResultRequest.java         | 104 ++++++++++++++++
 .../payload/FetchCombineResultResponse.java        |  89 +++++++++++++
 .../twostage/exchange/payload/RequestType.java}    |  36 ++++--
 .../receiver/TwoStageAggregateReceiver.java        |  80 ++++++++++++
 .../twostage/operator/CountOperator.java}          |  22 ++--
 .../processor/twostage/operator/Operator.java}     |  16 +--
 .../pipe/processor/twostage/state/CountState.java} |  36 ++++--
 .../db/pipe/processor/twostage/state/State.java}   |  18 ++-
 .../thrift/IoTDBDataNodeReceiverAgent.java         |   3 +
 .../request/IoTDBConnectorRequestVersion.java      |   1 +
 17 files changed, 857 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
index 555696d4147..31d38d00f6f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
@@ -25,17 +25,12 @@ import 
org.apache.iotdb.db.pipe.receiver.protocol.airgap.IoTDBAirGapReceiverAgen
 import 
org.apache.iotdb.db.pipe.receiver.protocol.legacy.IoTDBLegacyPipeReceiverAgent;
 import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.util.Arrays;
 
 /** {@link PipeDataNodeReceiverAgent} is the entry point of all pipe 
receivers' logic. */
 public class PipeDataNodeReceiverAgent {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeReceiverAgent.class);
-
   private final IoTDBDataNodeReceiverAgent thriftAgent;
   private final IoTDBAirGapReceiverAgent airGapAgent;
   private final IoTDBLegacyPipeReceiverAgent legacyAgent;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8c806fe2999..b8ace00dd2f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -411,4 +411,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // Set pipe meta status to STOPPED
     pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
   }
+
+  ///////////////////////// Utils /////////////////////////
+
+  public Set<Integer> getPipeTaskRegionIdSet(String pipeName, long 
creationTime) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != 
creationTime
+        ? Collections.emptySet()
+        : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java
new file mode 100644
index 00000000000..f0fec8e987f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+public class TwoStageCountProcessor implements PipeProcessor {
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {}
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
+      throws Exception {}
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {}
+
+  @Override
+  public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {
+    PipeProcessor.super.process(tsFileInsertionEvent, eventCollector);
+  }
+
+  @Override
+  public void process(Event event, EventCollector eventCollector) throws 
Exception {}
+
+  @Override
+  public void close() throws Exception {}
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
new file mode 100644
index 00000000000..adffe7bb56b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Combiner {
+
+  private static final int MAX_COMBINER_LIVE_TIME_IN_MS = 1000 * 60 * 30; // 
30 minutes
+  private final long creationTimeInMs;
+
+  private final Operator operator;
+
+  private final Set<Integer> expectedRegionIdSet;
+  private final Set<Integer> receivedRegionIdSet;
+
+  private final AtomicBoolean isComplete = new AtomicBoolean(false);
+
+  public Combiner(Operator operator, Set<Integer> expectedRegionIdSet) {
+    this.creationTimeInMs = System.currentTimeMillis();
+
+    this.operator = operator;
+
+    this.expectedRegionIdSet = expectedRegionIdSet;
+    this.receivedRegionIdSet = new HashSet<>();
+  }
+
+  public void combine(int regionId, State state) {
+    if (expectedRegionIdSet.contains(regionId)) {
+      return;
+    }
+
+    receivedRegionIdSet.add(regionId);
+    operator.combine(state);
+
+    if (receivedRegionIdSet.equals(expectedRegionIdSet)) {
+      operator.onComplete();
+    }
+
+    isComplete.set(true);
+  }
+
+  public boolean isOutdated() {
+    return System.currentTimeMillis() - creationTimeInMs > 
MAX_COMBINER_LIVE_TIME_IN_MS;
+  }
+
+  public boolean isComplete() {
+    return isComplete.get();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
new file mode 100644
index 00000000000..de417f24a81
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeCombineHandler {
+
+  private final String pipeName;
+  private final long creationTime;
+
+  private final Operator operator;
+
+  private final Set<Integer> expectedRegionIdSet;
+
+  private final ConcurrentMap<String, Combiner> combineId2Combiner;
+
+  public PipeCombineHandler(String pipeName, long creationTime, Operator 
operator) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+
+    this.operator = operator;
+
+    expectedRegionIdSet = new HashSet<>();
+    fetchExpectedRegionIdSet();
+
+    combineId2Combiner = new ConcurrentHashMap<>();
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  public synchronized void combine(int regionId, String combineId, State 
state) {
+    combineId2Combiner
+        .computeIfAbsent(combineId, id -> new Combiner(operator, 
expectedRegionIdSet))
+        .combine(regionId, state);
+  }
+
+  public synchronized FetchCombineResultResponse 
fetchCombineResult(List<String> combineIdList)
+      throws IOException {
+    final Map<String, FetchCombineResultResponse.CombineResultType> 
combineId2ResultType =
+        new HashMap<>();
+    for (String combineId : combineIdList) {
+      final Combiner combiner = combineId2Combiner.get(combineId);
+
+      if (combiner == null || combiner.isOutdated()) {
+        combineId2ResultType.put(combineId, 
FetchCombineResultResponse.CombineResultType.OUTDATED);
+        continue;
+      }
+
+      combineId2ResultType.put(
+          combineId,
+          combiner.isComplete()
+              ? FetchCombineResultResponse.CombineResultType.SUCCESS
+              : FetchCombineResultResponse.CombineResultType.INCOMPLETE);
+    }
+
+    return 
FetchCombineResultResponse.toTPipeTransferResp(combineId2ResultType);
+  }
+
+  public synchronized void fetchExpectedRegionIdSet() {
+    expectedRegionIdSet.clear();
+    
expectedRegionIdSet.addAll(PipeAgent.task().getPipeTaskRegionIdSet(pipeName, 
creationTime));
+  }
+
+  public synchronized void cleanOutdatedCombiner() {
+    combineId2Combiner.entrySet().removeIf(entry -> 
entry.getValue().isOutdated());
+  }
+
+  public synchronized void close() {
+    combineId2Combiner.clear();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
new file mode 100644
index 00000000000..185cf840a20
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PipeCombineHandlerManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeCombineHandlerManager.class);
+
+  private final ConcurrentMap<String, PipeCombineHandler> 
pipeId2CombineHandler =
+      new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount =
+      new ConcurrentHashMap<>();
+
+  public synchronized void register(String pipeName, long creationTime, 
Operator operator) {
+    final String pipeId = generatePipeId(pipeName, creationTime);
+
+    pipeId2CombineHandler.putIfAbsent(
+        pipeId, new PipeCombineHandler(pipeName, creationTime, operator));
+    pipeId2ReferenceCount.putIfAbsent(pipeId, new AtomicInteger(0));
+
+    pipeId2ReferenceCount.get(pipeId).incrementAndGet();
+  }
+
+  public synchronized void deregister(String pipeName, long creationTime) {
+    final String pipeId = generatePipeId(pipeName, creationTime);
+
+    if (pipeId2ReferenceCount.containsKey(pipeId)
+        && pipeId2ReferenceCount.get(pipeId).decrementAndGet() <= 0) {
+      pipeId2ReferenceCount.remove(pipeId);
+      try {
+        pipeId2CombineHandler.remove(pipeId).close();
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred when closing CombineHandler(id = {})", 
pipeId, e);
+      }
+    }
+  }
+
+  public FetchCombineResultResponse handle(FetchCombineResultRequest 
fetchCombineResultRequest)
+      throws IOException {
+    final String pipeId =
+        generatePipeId(
+            fetchCombineResultRequest.getPipeName(), 
fetchCombineResultRequest.getCreationTime());
+
+    final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+    if (Objects.isNull(handler)) {
+      throw new PipeException("CombineHandler not found for pipeId = " + 
pipeId);
+    }
+
+    return 
handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList());
+  }
+
+  public TPipeTransferResp handle(CombineRequest combineRequest) {
+    final String pipeId =
+        generatePipeId(combineRequest.getPipeName(), 
combineRequest.getCreationTime());
+
+    final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+    if (Objects.isNull(handler)) {
+      throw new PipeException("CombineHandler not found for pipeId = " + 
pipeId);
+    }
+
+    handler.combine(
+        combineRequest.getRegionId(), combineRequest.getCombineId(), 
combineRequest.getState());
+    return new 
TPipeTransferResp().setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+  }
+
+  public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() {
+    final Map<String, PipeCombineHandler> pipeId2CombineHandlerSnapshot;
+    synchronized (this) {
+      pipeId2CombineHandlerSnapshot = new HashMap<>(pipeId2CombineHandler);
+    }
+
+    pipeId2CombineHandlerSnapshot.forEach(
+        (pipeId, handler) -> {
+          handler.fetchExpectedRegionIdSet();
+          handler.cleanOutdatedCombiner();
+        });
+  }
+
+  private static String generatePipeId(String pipeName, long creationTime) {
+    return pipeName + "-" + creationTime;
+  }
+
+  /////////////////////////////// Singleton ///////////////////////////////
+
+  private PipeCombineHandlerManager() {
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            
"CombineHandlerManager#fetchExpectedRegionIdSetAndCleanOutdatedCombiner",
+            this::fetchExpectedRegionIdSetAndCleanOutdatedCombiner,
+            600);
+  }
+
+  private static class CombineHandlerManagerHolder {
+    private static final PipeCombineHandlerManager INSTANCE = new 
PipeCombineHandlerManager();
+  }
+
+  public static PipeCombineHandlerManager getInstance() {
+    return CombineHandlerManagerHolder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
new file mode 100644
index 00000000000..f02cd691822
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CombineRequest extends TPipeTransferReq {
+
+  private String pipeName;
+  private long creationTime;
+  private int regionId;
+  private String combineId;
+
+  private State state;
+
+  private CombineRequest() {
+    // Empty constructor
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  public int getRegionId() {
+    return regionId;
+  }
+
+  public String getCombineId() {
+    return combineId;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public static CombineRequest toTPipeTransferReq(
+      String pipeName, long creationTime, int regionId, String combineId, 
State state)
+      throws IOException {
+    return new CombineRequest()
+        .convertToTPipeTransferReq(pipeName, creationTime, regionId, 
combineId, state);
+  }
+
+  public static CombineRequest fromTPipeTransferReq(TPipeTransferReq 
transferReq) throws Exception {
+    return new CombineRequest().translateFromTPipeTransferReq(transferReq);
+  }
+
+  private CombineRequest convertToTPipeTransferReq(
+      String pipeName, long creationTime, int regionId, String combineId, 
State state)
+      throws IOException {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+    this.regionId = regionId;
+    this.state = state;
+    this.combineId = combineId;
+
+    this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion();
+    this.type = RequestType.COMBINE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(pipeName, outputStream);
+      ReadWriteIOUtils.write(creationTime, outputStream);
+      ReadWriteIOUtils.write(regionId, outputStream);
+
+      ReadWriteIOUtils.write(state.getClass().getName(), outputStream);
+      state.serialize(outputStream);
+
+      this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return this;
+  }
+
+  private CombineRequest translateFromTPipeTransferReq(TPipeTransferReq 
transferReq)
+      throws Exception {
+    pipeName = ReadWriteIOUtils.readString(transferReq.body);
+    creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+    regionId = ReadWriteIOUtils.readInt(transferReq.body);
+    combineId = ReadWriteIOUtils.readString(transferReq.body);
+
+    final String stateClassName = 
ReadWriteIOUtils.readString(transferReq.body);
+    state = (State) Class.forName(stateClassName).newInstance();
+    state.deserialize(transferReq.body);
+
+    version = transferReq.version;
+    type = transferReq.type;
+    body = transferReq.body;
+
+    return this;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
new file mode 100644
index 00000000000..ed1e2e13963
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FetchCombineResultRequest extends TPipeTransferReq {
+
+  private String pipeName;
+  private long creationTime;
+  private List<String> combineIdList;
+
+  private FetchCombineResultRequest() {
+    // Empty constructor
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  public List<String> getCombineIdList() {
+    return combineIdList;
+  }
+
+  public static FetchCombineResultRequest toTPipeTransferReq(
+      String pipeName, long creationTime, List<String> combineIdList) throws 
IOException {
+    return new FetchCombineResultRequest()
+        .convertToTPipeTransferReq(pipeName, creationTime, combineIdList);
+  }
+
+  public static FetchCombineResultRequest 
fromTPipeTransferReq(TPipeTransferReq transferReq)
+      throws Exception {
+    return new 
FetchCombineResultRequest().translateFromTPipeTransferReq(transferReq);
+  }
+
+  private FetchCombineResultRequest convertToTPipeTransferReq(
+      String pipeName, long creationTime, List<String> combineIdList) throws 
IOException {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+    this.combineIdList = combineIdList;
+
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(pipeName, outputStream);
+      ReadWriteIOUtils.write(creationTime, outputStream);
+
+      ReadWriteIOUtils.write(combineIdList.size(), outputStream);
+      for (String combineId : combineIdList) {
+        ReadWriteIOUtils.write(combineId, outputStream);
+      }
+
+      this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return this;
+  }
+
+  private FetchCombineResultRequest 
translateFromTPipeTransferReq(TPipeTransferReq transferReq)
+      throws Exception {
+    pipeName = ReadWriteIOUtils.readString(transferReq.body);
+    creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+
+    combineIdList = new ArrayList<>();
+    final int combineIdListSize = ReadWriteIOUtils.readInt(transferReq.body);
+    for (int i = 0; i < combineIdListSize; i++) {
+      combineIdList.add(ReadWriteIOUtils.readString(transferReq.body));
+    }
+
+    version = transferReq.version;
+    type = transferReq.type;
+    body = transferReq.body;
+
+    return this;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
new file mode 100644
index 00000000000..a40309520b7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FetchCombineResultResponse extends TPipeTransferResp {
+
+  public enum CombineResultType {
+    SUCCESS,
+    INCOMPLETE,
+    OUTDATED,
+  }
+
+  private Map<String, CombineResultType> combineId2ResultType = new 
HashMap<>();
+
+  private FetchCombineResultResponse() {
+    // Empty constructor
+  }
+
+  public Map<String, CombineResultType> getCombineId2ResultType() {
+    return combineId2ResultType;
+  }
+
+  public static FetchCombineResultResponse toTPipeTransferResp(
+      Map<String, CombineResultType> combineId2ResultType) throws IOException {
+    final FetchCombineResultResponse response = new 
FetchCombineResultResponse();
+
+    response.combineId2ResultType = combineId2ResultType;
+
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(combineId2ResultType.size(), outputStream);
+      for (Map.Entry<String, CombineResultType> entry : 
combineId2ResultType.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue().ordinal(), outputStream);
+      }
+
+      response.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return response;
+  }
+
+  public static FetchCombineResultResponse 
fromTPipeTransferResp(TPipeTransferResp transferResp) {
+    final FetchCombineResultResponse response = new 
FetchCombineResultResponse();
+
+    response.status = transferResp.status;
+
+    response.combineId2ResultType = new HashMap<>();
+    if (response.isSetBody()) {
+      final int size = ReadWriteIOUtils.readInt(transferResp.body);
+      for (int i = 0; i < size; i++) {
+        final String combineId = 
ReadWriteIOUtils.readString(transferResp.body);
+        final CombineResultType resultType =
+            
CombineResultType.values()[ReadWriteIOUtils.readInt(transferResp.body)];
+        response.combineId2ResultType.put(combineId, resultType);
+      }
+    }
+
+    return response;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
similarity index 50%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
index 9f853dd403e..99f2b16cbb3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
@@ -17,19 +17,39 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
 
-public enum IoTDBConnectorRequestVersion {
-  VERSION_1((byte) 1),
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum RequestType {
+  COMBINE((short) 0),
+  FETCH_COMBINE_RESULT((short) 1),
   ;
 
-  private final byte version;
+  private final short type;
+
+  RequestType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  private static final Map<Short, RequestType> TYPE_MAP =
+      Arrays.stream(RequestType.values())
+          .collect(
+              HashMap::new,
+              (typeMap, pipeRequestType) -> 
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+              HashMap::putAll);
 
-  IoTDBConnectorRequestVersion(byte type) {
-    this.version = type;
+  public static boolean isValidatedRequestType(short type) {
+    return TYPE_MAP.containsKey(type);
   }
 
-  public byte getVersion() {
-    return version;
+  public static RequestType valueOf(short type) {
+    return TYPE_MAP.get(type);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
new file mode 100644
index 00000000000..a65abaefe52
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver;
+
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
+import 
org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.RequestType;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwoStageAggregateReceiver implements IoTDBReceiver {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TwoStageAggregateReceiver.class);
+
+  @Override
+  public IoTDBConnectorRequestVersion getVersion() {
+    return IoTDBConnectorRequestVersion.VERSION_2;
+  }
+
+  @Override
+  public TPipeTransferResp receive(TPipeTransferReq req) {
+    try {
+      final short rawRequestType = req.getType();
+      if (RequestType.isValidatedRequestType(rawRequestType)) {
+        switch (RequestType.valueOf(rawRequestType)) {
+          case COMBINE:
+            return PipeCombineHandlerManager.getInstance()
+                .handle(CombineRequest.fromTPipeTransferReq(req));
+          case FETCH_COMBINE_RESULT:
+            return PipeCombineHandlerManager.getInstance()
+                .handle(FetchCombineResultRequest.fromTPipeTransferReq(req));
+          default:
+            break;
+        }
+      }
+
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_TYPE_ERROR,
+              String.format("Unknown request type %s.", rawRequestType)));
+    } catch (Exception e) {
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_ERROR,
+              String.format("Error occurs when receiving request: %s.", 
e.getMessage())));
+    }
+  }
+
+  @Override
+  public void handleExit() {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Two stage aggregate receiver is exiting.");
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
similarity index 63%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
index 9f853dd403e..5af3c398c81 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
@@ -17,19 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
 
-public enum IoTDBConnectorRequestVersion {
-  VERSION_1((byte) 1),
-  ;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
 
-  private final byte version;
+public class CountOperator implements Operator {
 
-  IoTDBConnectorRequestVersion(byte type) {
-    this.version = type;
+  private long count = 0;
+
+  @Override
+  public void combine(State state) {
+    final CountState countState = (CountState) state;
+    count += countState.getCount();
   }
 
-  public byte getVersion() {
-    return version;
+  @Override
+  public void onComplete() {
+    // output to queue
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
similarity index 73%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
index 9f853dd403e..885c565442b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
@@ -17,19 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
 
-public enum IoTDBConnectorRequestVersion {
-  VERSION_1((byte) 1),
-  ;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
 
-  private final byte version;
+public interface Operator {
 
-  IoTDBConnectorRequestVersion(byte type) {
-    this.version = type;
-  }
+  void combine(State state);
 
-  public byte getVersion() {
-    return version;
-  }
+  void onComplete();
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
similarity index 55%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
index 9f853dd403e..0bb826fd13a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
@@ -17,19 +17,37 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.state;
 
-public enum IoTDBConnectorRequestVersion {
-  VERSION_1((byte) 1),
-  ;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-  private final byte version;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
-  IoTDBConnectorRequestVersion(byte type) {
-    this.version = type;
+public class CountState implements State {
+
+  private long count;
+
+  CountState() {
+    count = 0;
+  }
+
+  public void accumulate(int value) {
+    count += value;
+  }
+
+  public long getCount() {
+    return count;
+  }
+
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(count, outputStream);
   }
 
-  public byte getVersion() {
-    return version;
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    count = ReadWriteIOUtils.readLong(byteBuffer);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
similarity index 73%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
index 9f853dd403e..36d393e9ffa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
@@ -17,19 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+package org.apache.iotdb.db.pipe.processor.twostage.state;
 
-public enum IoTDBConnectorRequestVersion {
-  VERSION_1((byte) 1),
-  ;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
-  private final byte version;
+public interface State {
 
-  IoTDBConnectorRequestVersion(byte type) {
-    this.version = type;
-  }
+  void serialize(OutputStream outputStream) throws IOException;
 
-  public byte getVersion() {
-    return version;
-  }
+  void deserialize(ByteBuffer byteBuffer);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index 932123a4260..e0016eceac2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver.TwoStageAggregateReceiver;
 
 public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent {
 
@@ -31,6 +32,8 @@ public class IoTDBDataNodeReceiverAgent extends 
IoTDBReceiverAgent {
   protected void initConstructors() {
     RECEIVER_CONSTRUCTORS.put(
         IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
IoTDBDataNodeReceiver::new);
+    RECEIVER_CONSTRUCTORS.put(
+        IoTDBConnectorRequestVersion.VERSION_2.getVersion(), 
TwoStageAggregateReceiver::new);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
index 9f853dd403e..74219ef6b29 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
 
 public enum IoTDBConnectorRequestVersion {
   VERSION_1((byte) 1),
+  VERSION_2((byte) 2),
   ;
 
   private final byte version;


Reply via email to