This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch global-counter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6f8537022cfef16577f1872081f401a1be9bd843 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Apr 12 19:21:27 2024 +0800 2 --- .../processor/twostage/TwoStageCountProcessor.java | 205 ++++++++++++++++++++- .../twostage/combiner/PipeCombineHandler.java | 12 +- .../combiner/PipeCombineHandlerManager.java | 26 +-- .../exchange/sender/TwoStageAggregateSender.java | 149 +++++++++++++++ .../processor/twostage/operator/CountOperator.java | 20 +- .../pipe/processor/twostage/state/CountState.java | 8 +- .../config/constant/PipeProcessorConstant.java | 2 + 7 files changed, 390 insertions(+), 32 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java index f0fec8e987f..8d5a454a466 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java @@ -19,6 +19,24 @@ package org.apache.iotdb.db.pipe.processor.twostage; +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.utils.PathUtils; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent; +import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.sender.TwoStageAggregateSender; +import org.apache.iotdb.db.pipe.processor.twostage.operator.CountOperator; +import org.apache.iotdb.db.pipe.processor.twostage.state.CountState; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; @@ -27,29 +45,204 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Collections; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; public class TwoStageCountProcessor implements PipeProcessor { + private String pipeName; + private long creationTime; + private int regionId; + private PipeTaskMeta pipeTaskMeta; + + private PartialPath outputSeries; + + private final AtomicLong localCount = new AtomicLong(0); + private final AtomicReference<ProgressIndex> localCommitProgressIndex = + new AtomicReference<>(MinimumProgressIndex.INSTANCE); + + private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local count], progress index) */> + localRequestQueue = new ConcurrentLinkedQueue<>(); + private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local count], progress index) */> + localCommitQueue = new ConcurrentLinkedQueue<>(); + + private TwoStageAggregateSender twoStageAggregateSender; + private final Queue<Pair<Long, Long> /* (timestamp, global count) */> globalCountQueue = + new ConcurrentLinkedQueue<>(); + @Override - public void validate(PipeParameterValidator validator) throws Exception {} + public void validate(PipeParameterValidator validator) throws Exception { + validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY); + + final String rawOutputSeries = + validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY); + try { + PathUtils.isLegalPath(rawOutputSeries); + } catch (IllegalPathException e) { + throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries); + } + } @Override public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) - throws Exception {} + throws Exception { + final PipeTaskProcessorRuntimeEnvironment runtimeEnvironment = + (PipeTaskProcessorRuntimeEnvironment) configuration.getRuntimeEnvironment(); + pipeName = runtimeEnvironment.getPipeName(); + creationTime = runtimeEnvironment.getCreationTime(); + regionId = runtimeEnvironment.getRegionId(); + pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta(); + + outputSeries = + new PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY)); + + twoStageAggregateSender = new TwoStageAggregateSender(); + PipeCombineHandlerManager.getInstance() + .register( + pipeName, creationTime, (combineId) -> new CountOperator(combineId, globalCountQueue)); + } @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) - throws Exception {} + throws Exception { + // TODO: count the number of given points in the tablet + // TODO: ignore progress index report + localCount.incrementAndGet(); + } @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws Exception { - PipeProcessor.super.process(tsFileInsertionEvent, eventCollector); + // TODO: count the number of points in the TsFile + localCount.incrementAndGet(); } @Override - public void process(Event event, EventCollector eventCollector) throws Exception {} + public void process(Event event, EventCollector eventCollector) throws Exception { + if (event instanceof PipeHeartbeatEvent) { + collectGlobalCountIfNecessary(eventCollector); + commitLocalProgressIndexIfNecessary(); + triggerCombineIfNecessary(); + } + + if (event instanceof PipeWatermarkEvent) { + triggerCombine( + ((PipeWatermarkEvent) event).getWatermark(), + localCount.get(), + localCommitProgressIndex.get()); + } + } + + private void collectGlobalCountIfNecessary(EventCollector eventCollector) throws IOException { + while (!globalCountQueue.isEmpty()) { + final Pair<Long, Long> timestampCountPair = globalCountQueue.poll(); + final Tablet tablet = + new Tablet( + outputSeries.getDevice(), + Collections.singletonList( + new MeasurementSchema(outputSeries.getMeasurement(), TSDataType.INT64)), + 1); + tablet.rowSize = 1; + tablet.addTimestamp(0, timestampCountPair.left); + tablet.addValue(outputSeries.getMeasurement(), 0, timestampCountPair.right); + eventCollector.collect( + new PipeRawTabletInsertionEvent(tablet, false, null, null, null, false)); + } + } + + private void commitLocalProgressIndexIfNecessary() throws TException, IOException { + while (!localCommitQueue.isEmpty()) { + final Pair<long[], ProgressIndex> pair = localCommitQueue.poll(); + try { + // TODO: optimize the combine result fetching with batch fetching + final FetchCombineResultResponse fetchCombineResultResponse = + FetchCombineResultResponse.fromTPipeTransferResp( + twoStageAggregateSender.request( + pair.left[0], + FetchCombineResultRequest.toTPipeTransferReq( + pipeName, + creationTime, + Collections.singletonList(Long.toString(pair.left[0]))))); + if (fetchCombineResultResponse.getStatus().getCode() + != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException( + "Failed to fetch combine result: " + + fetchCombineResultResponse.getStatus().getMessage()); + } + fetchCombineResultResponse + .getCombineId2ResultType() + .forEach( + (combineId, resultType) -> { + switch (resultType) { + case OUTDATED: + return; + case INCOMPLETE: + localCommitQueue.add(pair); + return; + case SUCCESS: + // TODO: record count in the progress index + pipeTaskMeta.updateProgressIndex(pair.right); + return; + default: + throw new PipeException("Unknown combine result type: " + resultType); + } + }); + } catch (Exception e) { + localCommitQueue.add(pair); + throw e; + } + } + } + + private void triggerCombineIfNecessary() throws TException, IOException { + while (!localRequestQueue.isEmpty()) { + final Pair<long[], ProgressIndex> timestampCountPair = localRequestQueue.poll(); + triggerCombine( + timestampCountPair.left[0], timestampCountPair.left[1], timestampCountPair.right); + } + } + + private void triggerCombine(long watermark, long count, ProgressIndex progressIndex) + throws TException, IOException { + final Pair<long[], ProgressIndex> pair = + new Pair<>(new long[] {watermark, count}, progressIndex); + try { + final TPipeTransferResp resp = + twoStageAggregateSender.request( + watermark, + CombineRequest.toTPipeTransferReq( + pipeName, + creationTime, + regionId, + Long.toString(watermark), + new CountState(count))); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException("Failed to combine count: " + resp.getStatus().getMessage()); + } + localCommitQueue.add(pair); + } catch (Exception e) { + localRequestQueue.add(pair); + throw e; + } + } @Override - public void close() throws Exception {} + public void close() throws Exception { + PipeCombineHandlerManager.getInstance().deregister(pipeName, creationTime); + twoStageAggregateSender.close(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java index de417f24a81..cd8eef6a06e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java @@ -32,23 +32,25 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; public class PipeCombineHandler { private final String pipeName; private final long creationTime; - private final Operator operator; + private final Function<String, Operator> /* <combineId, operator> */ operatorConstructor; private final Set<Integer> expectedRegionIdSet; private final ConcurrentMap<String, Combiner> combineId2Combiner; - public PipeCombineHandler(String pipeName, long creationTime, Operator operator) { + public PipeCombineHandler( + String pipeName, long creationTime, Function<String, Operator> operatorConstructor) { this.pipeName = pipeName; this.creationTime = creationTime; - this.operator = operator; + this.operatorConstructor = operatorConstructor; expectedRegionIdSet = new HashSet<>(); fetchExpectedRegionIdSet(); @@ -66,7 +68,9 @@ public class PipeCombineHandler { public synchronized void combine(int regionId, String combineId, State state) { combineId2Combiner - .computeIfAbsent(combineId, id -> new Combiner(operator, expectedRegionIdSet)) + .computeIfAbsent( + combineId, + id -> new Combiner(operatorConstructor.apply(combineId), expectedRegionIdSet)) .combine(regionId, state); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java index 185cf840a20..418d7045059 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java @@ -39,6 +39,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; public class PipeCombineHandlerManager { @@ -49,11 +50,12 @@ public class PipeCombineHandlerManager { private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount = new ConcurrentHashMap<>(); - public synchronized void register(String pipeName, long creationTime, Operator operator) { + public synchronized void register( + String pipeName, long creationTime, Function<String, Operator> operatorConstructor) { final String pipeId = generatePipeId(pipeName, creationTime); pipeId2CombineHandler.putIfAbsent( - pipeId, new PipeCombineHandler(pipeName, creationTime, operator)); + pipeId, new PipeCombineHandler(pipeName, creationTime, operatorConstructor)); pipeId2ReferenceCount.putIfAbsent(pipeId, new AtomicInteger(0)); pipeId2ReferenceCount.get(pipeId).incrementAndGet(); @@ -73,32 +75,32 @@ public class PipeCombineHandlerManager { } } - public FetchCombineResultResponse handle(FetchCombineResultRequest fetchCombineResultRequest) - throws IOException { + public TPipeTransferResp handle(CombineRequest combineRequest) { final String pipeId = - generatePipeId( - fetchCombineResultRequest.getPipeName(), fetchCombineResultRequest.getCreationTime()); + generatePipeId(combineRequest.getPipeName(), combineRequest.getCreationTime()); final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId); if (Objects.isNull(handler)) { throw new PipeException("CombineHandler not found for pipeId = " + pipeId); } - return handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList()); + handler.combine( + combineRequest.getRegionId(), combineRequest.getCombineId(), combineRequest.getState()); + return new TPipeTransferResp().setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } - public TPipeTransferResp handle(CombineRequest combineRequest) { + public FetchCombineResultResponse handle(FetchCombineResultRequest fetchCombineResultRequest) + throws IOException { final String pipeId = - generatePipeId(combineRequest.getPipeName(), combineRequest.getCreationTime()); + generatePipeId( + fetchCombineResultRequest.getPipeName(), fetchCombineResultRequest.getCreationTime()); final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId); if (Objects.isNull(handler)) { throw new PipeException("CombineHandler not found for pipeId = " + pipeId); } - handler.combine( - combineRequest.getRegionId(), combineRequest.getCombineId(), combineRequest.getState()); - return new TPipeTransferResp().setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + return handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList()); } public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java new file mode 100644 index 00000000000..2234bd7846b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class TwoStageAggregateSender implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateSender.class); + + private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); + + private static final Map<TEndPoint, IoTDBSyncClient> ENDPOINT_SYNC_CLIENT_MAP = + new ConcurrentHashMap<>(); + private static final AtomicInteger REFERENCE_COUNT = new AtomicInteger(0); + + private final TEndPoint[] endPoints; + + public TwoStageAggregateSender() { + synchronized (ENDPOINT_SYNC_CLIENT_MAP) { + REFERENCE_COUNT.incrementAndGet(); + + if (ENDPOINT_SYNC_CLIENT_MAP.isEmpty()) { + try (final ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TShowDataNodesResp showDataNodesResp = configNodeClient.showDataNodes(); + if (showDataNodesResp == null || showDataNodesResp.getDataNodesInfoList() == null) { + throw new PipeException("Failed to fetch data nodes"); + } + for (final TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) { + final TEndPoint endPoint = + new TEndPoint(dataNodeInfo.getRpcAddresss(), dataNodeInfo.getRpcPort()); + ENDPOINT_SYNC_CLIENT_MAP.put(endPoint, constructIoTDBSyncClient(endPoint)); + } + } catch (ClientManagerException | TException e) { + throw new PipeException("Failed to fetch data nodes", e); + } + + LOGGER.info( + "Data nodes' endpoints for two-stage aggregation: {}", + ENDPOINT_SYNC_CLIENT_MAP.keySet()); + } + + endPoints = ENDPOINT_SYNC_CLIENT_MAP.keySet().stream().sorted().toArray(TEndPoint[]::new); + } + } + + public TPipeTransferResp request(long watermark, TPipeTransferReq req) throws TException { + final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length]; + IoTDBSyncClient client = ENDPOINT_SYNC_CLIENT_MAP.get(endPoint); + if (client == null) { + client = reconstructIoTDBSyncClient(endPoint); + } + try { + return client.pipeTransfer(req); + } catch (Exception e) { + reconstructIoTDBSyncClient(endPoint); + throw e; + } + } + + private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint) + throws TTransportException { + synchronized (ENDPOINT_SYNC_CLIENT_MAP) { + final IoTDBSyncClient oldClient = ENDPOINT_SYNC_CLIENT_MAP.remove(endPoint); + if (oldClient != null) { + try { + oldClient.close(); + } catch (Exception e) { + LOGGER.warn("Failed to close old IoTDBSyncClient", e); + } + } + final IoTDBSyncClient newClient = constructIoTDBSyncClient(endPoint); + ENDPOINT_SYNC_CLIENT_MAP.put(endPoint, newClient); + return newClient; + } + } + + private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException { + return new IoTDBSyncClient( + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs()) + .setRpcThriftCompressionEnabled( + PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()) + .build(), + endPoint.getIp(), + endPoint.getPort(), + false, + null, + null); + } + + @Override + public void close() { + synchronized (ENDPOINT_SYNC_CLIENT_MAP) { + if (REFERENCE_COUNT.decrementAndGet() > 0) { + return; + } + + for (final IoTDBSyncClient client : ENDPOINT_SYNC_CLIENT_MAP.values()) { + try { + client.close(); + } catch (Exception e) { + LOGGER.warn("Failed to close IoTDBSyncClient", e); + } + } + + ENDPOINT_SYNC_CLIENT_MAP.clear(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java index 5af3c398c81..a313cfbd7db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java @@ -21,19 +21,31 @@ package org.apache.iotdb.db.pipe.processor.twostage.operator; import org.apache.iotdb.db.pipe.processor.twostage.state.CountState; import org.apache.iotdb.db.pipe.processor.twostage.state.State; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.util.Queue; public class CountOperator implements Operator { - private long count = 0; + private final long onCompletionTimestamp; + private long globalCount; + + private final Queue<Pair<Long, Long>> globalCountQueue; + + public CountOperator(String combineId, Queue<Pair<Long, Long>> globalCountQueue) { + onCompletionTimestamp = Long.parseLong(combineId); + globalCount = 0; + + this.globalCountQueue = globalCountQueue; + } @Override public void combine(State state) { - final CountState countState = (CountState) state; - count += countState.getCount(); + globalCount += ((CountState) state).getCount(); } @Override public void onComplete() { - // output to queue + globalCountQueue.add(new Pair<>(onCompletionTimestamp, globalCount)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java index 0bb826fd13a..617d1242d51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java @@ -29,12 +29,8 @@ public class CountState implements State { private long count; - CountState() { - count = 0; - } - - public void accumulate(int value) { - count += value; + public CountState(long count) { + this.count = count; } public long getCount() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 719fa3c37e5..49fa2e2cab8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -66,6 +66,8 @@ public class PipeProcessorConstant { "processor.sdt.max-time-interval"; public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE; + public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; + private PipeProcessorConstant() { throw new IllegalStateException("Utility class"); }
