This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch global-counter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6f8537022cfef16577f1872081f401a1be9bd843
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Apr 12 19:21:27 2024 +0800

    2
---
 .../processor/twostage/TwoStageCountProcessor.java | 205 ++++++++++++++++++++-
 .../twostage/combiner/PipeCombineHandler.java      |  12 +-
 .../combiner/PipeCombineHandlerManager.java        |  26 +--
 .../exchange/sender/TwoStageAggregateSender.java   | 149 +++++++++++++++
 .../processor/twostage/operator/CountOperator.java |  20 +-
 .../pipe/processor/twostage/state/CountState.java  |   8 +-
 .../config/constant/PipeProcessorConstant.java     |   2 +
 7 files changed, 390 insertions(+), 32 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java
index f0fec8e987f..8d5a454a466 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java
@@ -19,6 +19,24 @@
 
 package org.apache.iotdb.db.pipe.processor.twostage;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+import 
org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.sender.TwoStageAggregateSender;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.CountOperator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
@@ -27,29 +45,204 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TwoStageCountProcessor implements PipeProcessor {
 
+  private String pipeName;
+  private long creationTime;
+  private int regionId;
+  private PipeTaskMeta pipeTaskMeta;
+
+  private PartialPath outputSeries;
+
+  private final AtomicLong localCount = new AtomicLong(0);
+  private final AtomicReference<ProgressIndex> localCommitProgressIndex =
+      new AtomicReference<>(MinimumProgressIndex.INSTANCE);
+
+  private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local 
count], progress index) */>
+      localRequestQueue = new ConcurrentLinkedQueue<>();
+  private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local 
count], progress index) */>
+      localCommitQueue = new ConcurrentLinkedQueue<>();
+
+  private TwoStageAggregateSender twoStageAggregateSender;
+  private final Queue<Pair<Long, Long> /* (timestamp, global count) */> 
globalCountQueue =
+      new ConcurrentLinkedQueue<>();
+
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {}
+  public void validate(PipeParameterValidator validator) throws Exception {
+    
validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+
+    final String rawOutputSeries =
+        
validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+    try {
+      PathUtils.isLegalPath(rawOutputSeries);
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Illegal output series path: " + 
rawOutputSeries);
+    }
+  }
 
   @Override
   public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
-      throws Exception {}
+      throws Exception {
+    final PipeTaskProcessorRuntimeEnvironment runtimeEnvironment =
+        (PipeTaskProcessorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    pipeName = runtimeEnvironment.getPipeName();
+    creationTime = runtimeEnvironment.getCreationTime();
+    regionId = runtimeEnvironment.getRegionId();
+    pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
+
+    outputSeries =
+        new 
PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY));
+
+    twoStageAggregateSender = new TwoStageAggregateSender();
+    PipeCombineHandlerManager.getInstance()
+        .register(
+            pipeName, creationTime, (combineId) -> new 
CountOperator(combineId, globalCountQueue));
+  }
 
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
-      throws Exception {}
+      throws Exception {
+    // TODO: count the number of given points in the tablet
+    // TODO: ignore progress index report
+    localCount.incrementAndGet();
+  }
 
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws Exception {
-    PipeProcessor.super.process(tsFileInsertionEvent, eventCollector);
+    // TODO: count the number of points in the TsFile
+    localCount.incrementAndGet();
   }
 
   @Override
-  public void process(Event event, EventCollector eventCollector) throws 
Exception {}
+  public void process(Event event, EventCollector eventCollector) throws 
Exception {
+    if (event instanceof PipeHeartbeatEvent) {
+      collectGlobalCountIfNecessary(eventCollector);
+      commitLocalProgressIndexIfNecessary();
+      triggerCombineIfNecessary();
+    }
+
+    if (event instanceof PipeWatermarkEvent) {
+      triggerCombine(
+          ((PipeWatermarkEvent) event).getWatermark(),
+          localCount.get(),
+          localCommitProgressIndex.get());
+    }
+  }
+
+  private void collectGlobalCountIfNecessary(EventCollector eventCollector) 
throws IOException {
+    while (!globalCountQueue.isEmpty()) {
+      final Pair<Long, Long> timestampCountPair = globalCountQueue.poll();
+      final Tablet tablet =
+          new Tablet(
+              outputSeries.getDevice(),
+              Collections.singletonList(
+                  new MeasurementSchema(outputSeries.getMeasurement(), 
TSDataType.INT64)),
+              1);
+      tablet.rowSize = 1;
+      tablet.addTimestamp(0, timestampCountPair.left);
+      tablet.addValue(outputSeries.getMeasurement(), 0, 
timestampCountPair.right);
+      eventCollector.collect(
+          new PipeRawTabletInsertionEvent(tablet, false, null, null, null, 
false));
+    }
+  }
+
+  private void commitLocalProgressIndexIfNecessary() throws TException, 
IOException {
+    while (!localCommitQueue.isEmpty()) {
+      final Pair<long[], ProgressIndex> pair = localCommitQueue.poll();
+      try {
+        // TODO: optimize the combine result fetching with batch fetching
+        final FetchCombineResultResponse fetchCombineResultResponse =
+            FetchCombineResultResponse.fromTPipeTransferResp(
+                twoStageAggregateSender.request(
+                    pair.left[0],
+                    FetchCombineResultRequest.toTPipeTransferReq(
+                        pipeName,
+                        creationTime,
+                        
Collections.singletonList(Long.toString(pair.left[0])))));
+        if (fetchCombineResultResponse.getStatus().getCode()
+            != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new PipeException(
+              "Failed to fetch combine result: "
+                  + fetchCombineResultResponse.getStatus().getMessage());
+        }
+        fetchCombineResultResponse
+            .getCombineId2ResultType()
+            .forEach(
+                (combineId, resultType) -> {
+                  switch (resultType) {
+                    case OUTDATED:
+                      return;
+                    case INCOMPLETE:
+                      localCommitQueue.add(pair);
+                      return;
+                    case SUCCESS:
+                      // TODO: record count in the progress index
+                      pipeTaskMeta.updateProgressIndex(pair.right);
+                      return;
+                    default:
+                      throw new PipeException("Unknown combine result type: " 
+ resultType);
+                  }
+                });
+      } catch (Exception e) {
+        localCommitQueue.add(pair);
+        throw e;
+      }
+    }
+  }
+
+  private void triggerCombineIfNecessary() throws TException, IOException {
+    while (!localRequestQueue.isEmpty()) {
+      final Pair<long[], ProgressIndex> timestampCountPair = 
localRequestQueue.poll();
+      triggerCombine(
+          timestampCountPair.left[0], timestampCountPair.left[1], 
timestampCountPair.right);
+    }
+  }
+
+  private void triggerCombine(long watermark, long count, ProgressIndex 
progressIndex)
+      throws TException, IOException {
+    final Pair<long[], ProgressIndex> pair =
+        new Pair<>(new long[] {watermark, count}, progressIndex);
+    try {
+      final TPipeTransferResp resp =
+          twoStageAggregateSender.request(
+              watermark,
+              CombineRequest.toTPipeTransferReq(
+                  pipeName,
+                  creationTime,
+                  regionId,
+                  Long.toString(watermark),
+                  new CountState(count)));
+      if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeException("Failed to combine count: " + 
resp.getStatus().getMessage());
+      }
+      localCommitQueue.add(pair);
+    } catch (Exception e) {
+      localRequestQueue.add(pair);
+      throw e;
+    }
+  }
 
   @Override
-  public void close() throws Exception {}
+  public void close() throws Exception {
+    PipeCombineHandlerManager.getInstance().deregister(pipeName, creationTime);
+    twoStageAggregateSender.close();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
index de417f24a81..cd8eef6a06e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
@@ -32,23 +32,25 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
 
 public class PipeCombineHandler {
 
   private final String pipeName;
   private final long creationTime;
 
-  private final Operator operator;
+  private final Function<String, Operator> /* <combineId, operator> */ 
operatorConstructor;
 
   private final Set<Integer> expectedRegionIdSet;
 
   private final ConcurrentMap<String, Combiner> combineId2Combiner;
 
-  public PipeCombineHandler(String pipeName, long creationTime, Operator 
operator) {
+  public PipeCombineHandler(
+      String pipeName, long creationTime, Function<String, Operator> 
operatorConstructor) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
 
-    this.operator = operator;
+    this.operatorConstructor = operatorConstructor;
 
     expectedRegionIdSet = new HashSet<>();
     fetchExpectedRegionIdSet();
@@ -66,7 +68,9 @@ public class PipeCombineHandler {
 
   public synchronized void combine(int regionId, String combineId, State 
state) {
     combineId2Combiner
-        .computeIfAbsent(combineId, id -> new Combiner(operator, 
expectedRegionIdSet))
+        .computeIfAbsent(
+            combineId,
+            id -> new Combiner(operatorConstructor.apply(combineId), 
expectedRegionIdSet))
         .combine(regionId, state);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
index 185cf840a20..418d7045059 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
@@ -39,6 +39,7 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 
 public class PipeCombineHandlerManager {
 
@@ -49,11 +50,12 @@ public class PipeCombineHandlerManager {
   private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount =
       new ConcurrentHashMap<>();
 
-  public synchronized void register(String pipeName, long creationTime, 
Operator operator) {
+  public synchronized void register(
+      String pipeName, long creationTime, Function<String, Operator> 
operatorConstructor) {
     final String pipeId = generatePipeId(pipeName, creationTime);
 
     pipeId2CombineHandler.putIfAbsent(
-        pipeId, new PipeCombineHandler(pipeName, creationTime, operator));
+        pipeId, new PipeCombineHandler(pipeName, creationTime, 
operatorConstructor));
     pipeId2ReferenceCount.putIfAbsent(pipeId, new AtomicInteger(0));
 
     pipeId2ReferenceCount.get(pipeId).incrementAndGet();
@@ -73,32 +75,32 @@ public class PipeCombineHandlerManager {
     }
   }
 
-  public FetchCombineResultResponse handle(FetchCombineResultRequest 
fetchCombineResultRequest)
-      throws IOException {
+  public TPipeTransferResp handle(CombineRequest combineRequest) {
     final String pipeId =
-        generatePipeId(
-            fetchCombineResultRequest.getPipeName(), 
fetchCombineResultRequest.getCreationTime());
+        generatePipeId(combineRequest.getPipeName(), 
combineRequest.getCreationTime());
 
     final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
     if (Objects.isNull(handler)) {
       throw new PipeException("CombineHandler not found for pipeId = " + 
pipeId);
     }
 
-    return 
handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList());
+    handler.combine(
+        combineRequest.getRegionId(), combineRequest.getCombineId(), 
combineRequest.getState());
+    return new 
TPipeTransferResp().setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
 
-  public TPipeTransferResp handle(CombineRequest combineRequest) {
+  public FetchCombineResultResponse handle(FetchCombineResultRequest 
fetchCombineResultRequest)
+      throws IOException {
     final String pipeId =
-        generatePipeId(combineRequest.getPipeName(), 
combineRequest.getCreationTime());
+        generatePipeId(
+            fetchCombineResultRequest.getPipeName(), 
fetchCombineResultRequest.getCreationTime());
 
     final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
     if (Objects.isNull(handler)) {
       throw new PipeException("CombineHandler not found for pipeId = " + 
pipeId);
     }
 
-    handler.combine(
-        combineRequest.getRegionId(), combineRequest.getCombineId(), 
combineRequest.getState());
-    return new 
TPipeTransferResp().setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    return 
handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList());
   }
 
   public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
new file mode 100644
index 00000000000..2234bd7846b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TwoStageAggregateSender implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TwoStageAggregateSender.class);
+
+  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+  private static final Map<TEndPoint, IoTDBSyncClient> 
ENDPOINT_SYNC_CLIENT_MAP =
+      new ConcurrentHashMap<>();
+  private static final AtomicInteger REFERENCE_COUNT = new AtomicInteger(0);
+
+  private final TEndPoint[] endPoints;
+
+  public TwoStageAggregateSender() {
+    synchronized (ENDPOINT_SYNC_CLIENT_MAP) {
+      REFERENCE_COUNT.incrementAndGet();
+
+      if (ENDPOINT_SYNC_CLIENT_MAP.isEmpty()) {
+        try (final ConfigNodeClient configNodeClient =
+            
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+          final TShowDataNodesResp showDataNodesResp = 
configNodeClient.showDataNodes();
+          if (showDataNodesResp == null || 
showDataNodesResp.getDataNodesInfoList() == null) {
+            throw new PipeException("Failed to fetch data nodes");
+          }
+          for (final TDataNodeInfo dataNodeInfo : 
showDataNodesResp.getDataNodesInfoList()) {
+            final TEndPoint endPoint =
+                new TEndPoint(dataNodeInfo.getRpcAddresss(), 
dataNodeInfo.getRpcPort());
+            ENDPOINT_SYNC_CLIENT_MAP.put(endPoint, 
constructIoTDBSyncClient(endPoint));
+          }
+        } catch (ClientManagerException | TException e) {
+          throw new PipeException("Failed to fetch data nodes", e);
+        }
+
+        LOGGER.info(
+            "Data nodes' endpoints for two-stage aggregation: {}",
+            ENDPOINT_SYNC_CLIENT_MAP.keySet());
+      }
+
+      endPoints = 
ENDPOINT_SYNC_CLIENT_MAP.keySet().stream().sorted().toArray(TEndPoint[]::new);
+    }
+  }
+
+  public TPipeTransferResp request(long watermark, TPipeTransferReq req) 
throws TException {
+    final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
+    IoTDBSyncClient client = ENDPOINT_SYNC_CLIENT_MAP.get(endPoint);
+    if (client == null) {
+      client = reconstructIoTDBSyncClient(endPoint);
+    }
+    try {
+      return client.pipeTransfer(req);
+    } catch (Exception e) {
+      reconstructIoTDBSyncClient(endPoint);
+      throw e;
+    }
+  }
+
+  private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
+      throws TTransportException {
+    synchronized (ENDPOINT_SYNC_CLIENT_MAP) {
+      final IoTDBSyncClient oldClient = 
ENDPOINT_SYNC_CLIENT_MAP.remove(endPoint);
+      if (oldClient != null) {
+        try {
+          oldClient.close();
+        } catch (Exception e) {
+          LOGGER.warn("Failed to close old IoTDBSyncClient", e);
+        }
+      }
+      final IoTDBSyncClient newClient = constructIoTDBSyncClient(endPoint);
+      ENDPOINT_SYNC_CLIENT_MAP.put(endPoint, newClient);
+      return newClient;
+    }
+  }
+
+  private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws 
TTransportException {
+    return new IoTDBSyncClient(
+        new ThriftClientProperty.Builder()
+            .setConnectionTimeoutMs((int) 
PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+            .setRpcThriftCompressionEnabled(
+                PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+            .build(),
+        endPoint.getIp(),
+        endPoint.getPort(),
+        false,
+        null,
+        null);
+  }
+
+  @Override
+  public void close() {
+    synchronized (ENDPOINT_SYNC_CLIENT_MAP) {
+      if (REFERENCE_COUNT.decrementAndGet() > 0) {
+        return;
+      }
+
+      for (final IoTDBSyncClient client : ENDPOINT_SYNC_CLIENT_MAP.values()) {
+        try {
+          client.close();
+        } catch (Exception e) {
+          LOGGER.warn("Failed to close IoTDBSyncClient", e);
+        }
+      }
+
+      ENDPOINT_SYNC_CLIENT_MAP.clear();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
index 5af3c398c81..a313cfbd7db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
@@ -21,19 +21,31 @@ package 
org.apache.iotdb.db.pipe.processor.twostage.operator;
 
 import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
 import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Queue;
 
 public class CountOperator implements Operator {
 
-  private long count = 0;
+  private final long onCompletionTimestamp;
+  private long globalCount;
+
+  private final Queue<Pair<Long, Long>> globalCountQueue;
+
+  public CountOperator(String combineId, Queue<Pair<Long, Long>> 
globalCountQueue) {
+    onCompletionTimestamp = Long.parseLong(combineId);
+    globalCount = 0;
+
+    this.globalCountQueue = globalCountQueue;
+  }
 
   @Override
   public void combine(State state) {
-    final CountState countState = (CountState) state;
-    count += countState.getCount();
+    globalCount += ((CountState) state).getCount();
   }
 
   @Override
   public void onComplete() {
-    // output to queue
+    globalCountQueue.add(new Pair<>(onCompletionTimestamp, globalCount));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
index 0bb826fd13a..617d1242d51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
@@ -29,12 +29,8 @@ public class CountState implements State {
 
   private long count;
 
-  CountState() {
-    count = 0;
-  }
-
-  public void accumulate(int value) {
-    count += value;
+  public CountState(long count) {
+    this.count = count;
   }
 
   public long getCount() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index 719fa3c37e5..49fa2e2cab8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -66,6 +66,8 @@ public class PipeProcessorConstant {
       "processor.sdt.max-time-interval";
   public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = 
Long.MAX_VALUE;
 
+  public static final String PROCESSOR_OUTPUT_SERIES_KEY = 
"processor.output-series";
+
   private PipeProcessorConstant() {
     throw new IllegalStateException("Utility class");
   }

Reply via email to