This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch global-counter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15c01584c5f26e7deae07b5835cc10368fe6d0f8 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Apr 11 22:31:19 2024 +0800 1st --- .../agent/receiver/PipeDataNodeReceiverAgent.java | 5 - .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 9 ++ .../processor/twostage/TwoStageCountProcessor.java | 55 ++++++++ .../pipe/processor/twostage/combiner/Combiner.java | 72 +++++++++++ .../twostage/combiner/PipeCombineHandler.java | 107 ++++++++++++++++ .../combiner/PipeCombineHandlerManager.java | 138 +++++++++++++++++++++ .../twostage/exchange/payload/CombineRequest.java | 119 ++++++++++++++++++ .../payload/FetchCombineResultRequest.java | 104 ++++++++++++++++ .../payload/FetchCombineResultResponse.java | 89 +++++++++++++ .../twostage/exchange/payload/RequestType.java} | 36 ++++-- .../receiver/TwoStageAggregateReceiver.java | 80 ++++++++++++ .../twostage/operator/CountOperator.java} | 22 ++-- .../processor/twostage/operator/Operator.java} | 16 +-- .../pipe/processor/twostage/state/CountState.java} | 36 ++++-- .../db/pipe/processor/twostage/state/State.java} | 18 ++- .../thrift/IoTDBDataNodeReceiverAgent.java | 3 + .../request/IoTDBConnectorRequestVersion.java | 1 + 17 files changed, 857 insertions(+), 53 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java index 555696d4147..31d38d00f6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java @@ -25,17 +25,12 @@ import org.apache.iotdb.db.pipe.receiver.protocol.airgap.IoTDBAirGapReceiverAgen import org.apache.iotdb.db.pipe.receiver.protocol.legacy.IoTDBLegacyPipeReceiverAgent; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.util.Arrays; /** {@link PipeDataNodeReceiverAgent} is the entry point of all pipe receivers' logic. */ public class PipeDataNodeReceiverAgent { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeReceiverAgent.class); - private final IoTDBDataNodeReceiverAgent thriftAgent; private final IoTDBAirGapReceiverAgent airGapAgent; private final IoTDBLegacyPipeReceiverAgent legacyAgent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 8c806fe2999..b8ace00dd2f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -411,4 +411,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { // Set pipe meta status to STOPPED pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); } + + ///////////////////////// Utils ///////////////////////// + + public Set<Integer> getPipeTaskRegionIdSet(String pipeName, long creationTime) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != creationTime + ? Collections.emptySet() + : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java new file mode 100644 index 00000000000..f0fec8e987f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/TwoStageCountProcessor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage; + +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +public class TwoStageCountProcessor implements PipeProcessor { + + @Override + public void validate(PipeParameterValidator validator) throws Exception {} + + @Override + public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) + throws Exception {} + + @Override + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) + throws Exception {} + + @Override + public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) + throws Exception { + PipeProcessor.super.process(tsFileInsertionEvent, eventCollector); + } + + @Override + public void process(Event event, EventCollector eventCollector) throws Exception {} + + @Override + public void close() throws Exception {} +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java new file mode 100644 index 00000000000..adffe7bb56b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.combiner; + +import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator; +import org.apache.iotdb.db.pipe.processor.twostage.state.State; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +public class Combiner { + + private static final int MAX_COMBINER_LIVE_TIME_IN_MS = 1000 * 60 * 30; // 30 minutes + private final long creationTimeInMs; + + private final Operator operator; + + private final Set<Integer> expectedRegionIdSet; + private final Set<Integer> receivedRegionIdSet; + + private final AtomicBoolean isComplete = new AtomicBoolean(false); + + public Combiner(Operator operator, Set<Integer> expectedRegionIdSet) { + this.creationTimeInMs = System.currentTimeMillis(); + + this.operator = operator; + + this.expectedRegionIdSet = expectedRegionIdSet; + this.receivedRegionIdSet = new HashSet<>(); + } + + public void combine(int regionId, State state) { + if (expectedRegionIdSet.contains(regionId)) { + return; + } + + receivedRegionIdSet.add(regionId); + operator.combine(state); + + if (receivedRegionIdSet.equals(expectedRegionIdSet)) { + operator.onComplete(); + } + + isComplete.set(true); + } + + public boolean isOutdated() { + return System.currentTimeMillis() - creationTimeInMs > MAX_COMBINER_LIVE_TIME_IN_MS; + } + + public boolean isComplete() { + return isComplete.get(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java new file mode 100644 index 00000000000..de417f24a81 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.combiner; + +import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse; +import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator; +import org.apache.iotdb.db.pipe.processor.twostage.state.State; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class PipeCombineHandler { + + private final String pipeName; + private final long creationTime; + + private final Operator operator; + + private final Set<Integer> expectedRegionIdSet; + + private final ConcurrentMap<String, Combiner> combineId2Combiner; + + public PipeCombineHandler(String pipeName, long creationTime, Operator operator) { + this.pipeName = pipeName; + this.creationTime = creationTime; + + this.operator = operator; + + expectedRegionIdSet = new HashSet<>(); + fetchExpectedRegionIdSet(); + + combineId2Combiner = new ConcurrentHashMap<>(); + } + + public String getPipeName() { + return pipeName; + } + + public long getCreationTime() { + return creationTime; + } + + public synchronized void combine(int regionId, String combineId, State state) { + combineId2Combiner + .computeIfAbsent(combineId, id -> new Combiner(operator, expectedRegionIdSet)) + .combine(regionId, state); + } + + public synchronized FetchCombineResultResponse fetchCombineResult(List<String> combineIdList) + throws IOException { + final Map<String, FetchCombineResultResponse.CombineResultType> combineId2ResultType = + new HashMap<>(); + for (String combineId : combineIdList) { + final Combiner combiner = combineId2Combiner.get(combineId); + + if (combiner == null || combiner.isOutdated()) { + combineId2ResultType.put(combineId, FetchCombineResultResponse.CombineResultType.OUTDATED); + continue; + } + + combineId2ResultType.put( + combineId, + combiner.isComplete() + ? FetchCombineResultResponse.CombineResultType.SUCCESS + : FetchCombineResultResponse.CombineResultType.INCOMPLETE); + } + + return FetchCombineResultResponse.toTPipeTransferResp(combineId2ResultType); + } + + public synchronized void fetchExpectedRegionIdSet() { + expectedRegionIdSet.clear(); + expectedRegionIdSet.addAll(PipeAgent.task().getPipeTaskRegionIdSet(pipeName, creationTime)); + } + + public synchronized void cleanOutdatedCombiner() { + combineId2Combiner.entrySet().removeIf(entry -> entry.getValue().isOutdated()); + } + + public synchronized void close() { + combineId2Combiner.clear(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java new file mode 100644 index 00000000000..185cf840a20 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.combiner; + +import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse; +import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class PipeCombineHandlerManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeCombineHandlerManager.class); + + private final ConcurrentMap<String, PipeCombineHandler> pipeId2CombineHandler = + new ConcurrentHashMap<>(); + private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount = + new ConcurrentHashMap<>(); + + public synchronized void register(String pipeName, long creationTime, Operator operator) { + final String pipeId = generatePipeId(pipeName, creationTime); + + pipeId2CombineHandler.putIfAbsent( + pipeId, new PipeCombineHandler(pipeName, creationTime, operator)); + pipeId2ReferenceCount.putIfAbsent(pipeId, new AtomicInteger(0)); + + pipeId2ReferenceCount.get(pipeId).incrementAndGet(); + } + + public synchronized void deregister(String pipeName, long creationTime) { + final String pipeId = generatePipeId(pipeName, creationTime); + + if (pipeId2ReferenceCount.containsKey(pipeId) + && pipeId2ReferenceCount.get(pipeId).decrementAndGet() <= 0) { + pipeId2ReferenceCount.remove(pipeId); + try { + pipeId2CombineHandler.remove(pipeId).close(); + } catch (Exception e) { + LOGGER.warn("Error occurred when closing CombineHandler(id = {})", pipeId, e); + } + } + } + + public FetchCombineResultResponse handle(FetchCombineResultRequest fetchCombineResultRequest) + throws IOException { + final String pipeId = + generatePipeId( + fetchCombineResultRequest.getPipeName(), fetchCombineResultRequest.getCreationTime()); + + final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId); + if (Objects.isNull(handler)) { + throw new PipeException("CombineHandler not found for pipeId = " + pipeId); + } + + return handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList()); + } + + public TPipeTransferResp handle(CombineRequest combineRequest) { + final String pipeId = + generatePipeId(combineRequest.getPipeName(), combineRequest.getCreationTime()); + + final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId); + if (Objects.isNull(handler)) { + throw new PipeException("CombineHandler not found for pipeId = " + pipeId); + } + + handler.combine( + combineRequest.getRegionId(), combineRequest.getCombineId(), combineRequest.getState()); + return new TPipeTransferResp().setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + } + + public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() { + final Map<String, PipeCombineHandler> pipeId2CombineHandlerSnapshot; + synchronized (this) { + pipeId2CombineHandlerSnapshot = new HashMap<>(pipeId2CombineHandler); + } + + pipeId2CombineHandlerSnapshot.forEach( + (pipeId, handler) -> { + handler.fetchExpectedRegionIdSet(); + handler.cleanOutdatedCombiner(); + }); + } + + private static String generatePipeId(String pipeName, long creationTime) { + return pipeName + "-" + creationTime; + } + + /////////////////////////////// Singleton /////////////////////////////// + + private PipeCombineHandlerManager() { + PipeAgent.runtime() + .registerPeriodicalJob( + "CombineHandlerManager#fetchExpectedRegionIdSetAndCleanOutdatedCombiner", + this::fetchExpectedRegionIdSetAndCleanOutdatedCombiner, + 600); + } + + private static class CombineHandlerManagerHolder { + private static final PipeCombineHandlerManager INSTANCE = new PipeCombineHandlerManager(); + } + + public static PipeCombineHandlerManager getInstance() { + return CombineHandlerManagerHolder.INSTANCE; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java new file mode 100644 index 00000000000..f02cd691822 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload; + +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; +import org.apache.iotdb.db.pipe.processor.twostage.state.State; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class CombineRequest extends TPipeTransferReq { + + private String pipeName; + private long creationTime; + private int regionId; + private String combineId; + + private State state; + + private CombineRequest() { + // Empty constructor + } + + public String getPipeName() { + return pipeName; + } + + public long getCreationTime() { + return creationTime; + } + + public int getRegionId() { + return regionId; + } + + public String getCombineId() { + return combineId; + } + + public State getState() { + return state; + } + + public static CombineRequest toTPipeTransferReq( + String pipeName, long creationTime, int regionId, String combineId, State state) + throws IOException { + return new CombineRequest() + .convertToTPipeTransferReq(pipeName, creationTime, regionId, combineId, state); + } + + public static CombineRequest fromTPipeTransferReq(TPipeTransferReq transferReq) throws Exception { + return new CombineRequest().translateFromTPipeTransferReq(transferReq); + } + + private CombineRequest convertToTPipeTransferReq( + String pipeName, long creationTime, int regionId, String combineId, State state) + throws IOException { + this.pipeName = pipeName; + this.creationTime = creationTime; + this.regionId = regionId; + this.state = state; + this.combineId = combineId; + + this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion(); + this.type = RequestType.COMBINE.getType(); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(pipeName, outputStream); + ReadWriteIOUtils.write(creationTime, outputStream); + ReadWriteIOUtils.write(regionId, outputStream); + + ReadWriteIOUtils.write(state.getClass().getName(), outputStream); + state.serialize(outputStream); + + this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + + return this; + } + + private CombineRequest translateFromTPipeTransferReq(TPipeTransferReq transferReq) + throws Exception { + pipeName = ReadWriteIOUtils.readString(transferReq.body); + creationTime = ReadWriteIOUtils.readLong(transferReq.body); + regionId = ReadWriteIOUtils.readInt(transferReq.body); + combineId = ReadWriteIOUtils.readString(transferReq.body); + + final String stateClassName = ReadWriteIOUtils.readString(transferReq.body); + state = (State) Class.forName(stateClassName).newInstance(); + state.deserialize(transferReq.body); + + version = transferReq.version; + type = transferReq.type; + body = transferReq.body; + + return this; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java new file mode 100644 index 00000000000..ed1e2e13963 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload; + +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class FetchCombineResultRequest extends TPipeTransferReq { + + private String pipeName; + private long creationTime; + private List<String> combineIdList; + + private FetchCombineResultRequest() { + // Empty constructor + } + + public String getPipeName() { + return pipeName; + } + + public long getCreationTime() { + return creationTime; + } + + public List<String> getCombineIdList() { + return combineIdList; + } + + public static FetchCombineResultRequest toTPipeTransferReq( + String pipeName, long creationTime, List<String> combineIdList) throws IOException { + return new FetchCombineResultRequest() + .convertToTPipeTransferReq(pipeName, creationTime, combineIdList); + } + + public static FetchCombineResultRequest fromTPipeTransferReq(TPipeTransferReq transferReq) + throws Exception { + return new FetchCombineResultRequest().translateFromTPipeTransferReq(transferReq); + } + + private FetchCombineResultRequest convertToTPipeTransferReq( + String pipeName, long creationTime, List<String> combineIdList) throws IOException { + this.pipeName = pipeName; + this.creationTime = creationTime; + this.combineIdList = combineIdList; + + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(pipeName, outputStream); + ReadWriteIOUtils.write(creationTime, outputStream); + + ReadWriteIOUtils.write(combineIdList.size(), outputStream); + for (String combineId : combineIdList) { + ReadWriteIOUtils.write(combineId, outputStream); + } + + this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + + return this; + } + + private FetchCombineResultRequest translateFromTPipeTransferReq(TPipeTransferReq transferReq) + throws Exception { + pipeName = ReadWriteIOUtils.readString(transferReq.body); + creationTime = ReadWriteIOUtils.readLong(transferReq.body); + + combineIdList = new ArrayList<>(); + final int combineIdListSize = ReadWriteIOUtils.readInt(transferReq.body); + for (int i = 0; i < combineIdListSize; i++) { + combineIdList.add(ReadWriteIOUtils.readString(transferReq.body)); + } + + version = transferReq.version; + type = transferReq.type; + body = transferReq.body; + + return this; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java new file mode 100644 index 00000000000..a40309520b7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload; + +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class FetchCombineResultResponse extends TPipeTransferResp { + + public enum CombineResultType { + SUCCESS, + INCOMPLETE, + OUTDATED, + } + + private Map<String, CombineResultType> combineId2ResultType = new HashMap<>(); + + private FetchCombineResultResponse() { + // Empty constructor + } + + public Map<String, CombineResultType> getCombineId2ResultType() { + return combineId2ResultType; + } + + public static FetchCombineResultResponse toTPipeTransferResp( + Map<String, CombineResultType> combineId2ResultType) throws IOException { + final FetchCombineResultResponse response = new FetchCombineResultResponse(); + + response.combineId2ResultType = combineId2ResultType; + + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(combineId2ResultType.size(), outputStream); + for (Map.Entry<String, CombineResultType> entry : combineId2ResultType.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue().ordinal(), outputStream); + } + + response.body = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + + return response; + } + + public static FetchCombineResultResponse fromTPipeTransferResp(TPipeTransferResp transferResp) { + final FetchCombineResultResponse response = new FetchCombineResultResponse(); + + response.status = transferResp.status; + + response.combineId2ResultType = new HashMap<>(); + if (response.isSetBody()) { + final int size = ReadWriteIOUtils.readInt(transferResp.body); + for (int i = 0; i < size; i++) { + final String combineId = ReadWriteIOUtils.readString(transferResp.body); + final CombineResultType resultType = + CombineResultType.values()[ReadWriteIOUtils.readInt(transferResp.body)]; + response.combineId2ResultType.put(combineId, resultType); + } + } + + return response; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java similarity index 50% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java index 9f853dd403e..99f2b16cbb3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java @@ -17,19 +17,39 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; +package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload; -public enum IoTDBConnectorRequestVersion { - VERSION_1((byte) 1), +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public enum RequestType { + COMBINE((short) 0), + FETCH_COMBINE_RESULT((short) 1), ; - private final byte version; + private final short type; + + RequestType(short type) { + this.type = type; + } + + public short getType() { + return type; + } + + private static final Map<Short, RequestType> TYPE_MAP = + Arrays.stream(RequestType.values()) + .collect( + HashMap::new, + (typeMap, pipeRequestType) -> typeMap.put(pipeRequestType.getType(), pipeRequestType), + HashMap::putAll); - IoTDBConnectorRequestVersion(byte type) { - this.version = type; + public static boolean isValidatedRequestType(short type) { + return TYPE_MAP.containsKey(type); } - public byte getVersion() { - return version; + public static RequestType valueOf(short type) { + return TYPE_MAP.get(type); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java new file mode 100644 index 00000000000..a65abaefe52 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver; + +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; +import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver; +import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.RequestType; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TwoStageAggregateReceiver implements IoTDBReceiver { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateReceiver.class); + + @Override + public IoTDBConnectorRequestVersion getVersion() { + return IoTDBConnectorRequestVersion.VERSION_2; + } + + @Override + public TPipeTransferResp receive(TPipeTransferReq req) { + try { + final short rawRequestType = req.getType(); + if (RequestType.isValidatedRequestType(rawRequestType)) { + switch (RequestType.valueOf(rawRequestType)) { + case COMBINE: + return PipeCombineHandlerManager.getInstance() + .handle(CombineRequest.fromTPipeTransferReq(req)); + case FETCH_COMBINE_RESULT: + return PipeCombineHandlerManager.getInstance() + .handle(FetchCombineResultRequest.fromTPipeTransferReq(req)); + default: + break; + } + } + + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.PIPE_TYPE_ERROR, + String.format("Unknown request type %s.", rawRequestType))); + } catch (Exception e) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.PIPE_ERROR, + String.format("Error occurs when receiving request: %s.", e.getMessage()))); + } + } + + @Override + public void handleExit() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Two stage aggregate receiver is exiting."); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java similarity index 63% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java index 9f853dd403e..5af3c398c81 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java @@ -17,19 +17,23 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; +package org.apache.iotdb.db.pipe.processor.twostage.operator; -public enum IoTDBConnectorRequestVersion { - VERSION_1((byte) 1), - ; +import org.apache.iotdb.db.pipe.processor.twostage.state.CountState; +import org.apache.iotdb.db.pipe.processor.twostage.state.State; - private final byte version; +public class CountOperator implements Operator { - IoTDBConnectorRequestVersion(byte type) { - this.version = type; + private long count = 0; + + @Override + public void combine(State state) { + final CountState countState = (CountState) state; + count += countState.getCount(); } - public byte getVersion() { - return version; + @Override + public void onComplete() { + // output to queue } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java similarity index 73% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java index 9f853dd403e..885c565442b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java @@ -17,19 +17,13 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; +package org.apache.iotdb.db.pipe.processor.twostage.operator; -public enum IoTDBConnectorRequestVersion { - VERSION_1((byte) 1), - ; +import org.apache.iotdb.db.pipe.processor.twostage.state.State; - private final byte version; +public interface Operator { - IoTDBConnectorRequestVersion(byte type) { - this.version = type; - } + void combine(State state); - public byte getVersion() { - return version; - } + void onComplete(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java similarity index 55% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java index 9f853dd403e..0bb826fd13a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java @@ -17,19 +17,37 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; +package org.apache.iotdb.db.pipe.processor.twostage.state; -public enum IoTDBConnectorRequestVersion { - VERSION_1((byte) 1), - ; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - private final byte version; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; - IoTDBConnectorRequestVersion(byte type) { - this.version = type; +public class CountState implements State { + + private long count; + + CountState() { + count = 0; + } + + public void accumulate(int value) { + count += value; + } + + public long getCount() { + return count; + } + + @Override + public void serialize(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(count, outputStream); } - public byte getVersion() { - return version; + @Override + public void deserialize(ByteBuffer byteBuffer) { + count = ReadWriteIOUtils.readLong(byteBuffer); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java similarity index 73% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java index 9f853dd403e..36d393e9ffa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java @@ -17,19 +17,15 @@ * under the License. */ -package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; +package org.apache.iotdb.db.pipe.processor.twostage.state; -public enum IoTDBConnectorRequestVersion { - VERSION_1((byte) 1), - ; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; - private final byte version; +public interface State { - IoTDBConnectorRequestVersion(byte type) { - this.version = type; - } + void serialize(OutputStream outputStream) throws IOException; - public byte getVersion() { - return version; - } + void deserialize(ByteBuffer byteBuffer); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java index 932123a4260..e0016eceac2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.receiver.protocol.thrift; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver; import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent; +import org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver.TwoStageAggregateReceiver; public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent { @@ -31,6 +32,8 @@ public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent { protected void initConstructors() { RECEIVER_CONSTRUCTORS.put( IoTDBConnectorRequestVersion.VERSION_1.getVersion(), IoTDBDataNodeReceiver::new); + RECEIVER_CONSTRUCTORS.put( + IoTDBConnectorRequestVersion.VERSION_2.getVersion(), TwoStageAggregateReceiver::new); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java index 9f853dd403e..74219ef6b29 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; public enum IoTDBConnectorRequestVersion { VERSION_1((byte) 1), + VERSION_2((byte) 2), ; private final byte version;
