This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load_v2 by this push:
new e852ea367d5 add TsFileSplitSender
e852ea367d5 is described below
commit e852ea367d5aa139016409592038a439f427ffae
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Sep 6 11:28:50 2023 +0800
add TsFileSplitSender
---
.../execution/load/AlignedChunkData.java | 16 ++
.../execution/load/DataPartitionBatchFetcher.java | 70 +++++
.../queryengine/execution/load/DeletionData.java | 11 +
.../execution/load/MergedTsFileSplitter.java | 105 ++++---
.../execution/load/NonAlignedChunkData.java | 16 ++
.../db/queryengine/execution/load/TsFileData.java | 4 +
.../execution/load/TsFileDataManager.java | 162 +++++++++++
.../execution/load/TsFileSplitSender.java | 296 +++++++++++++++++++
.../queryengine/execution/load/TsFileSplitter.java | 18 +-
.../planner/plan/node/load/LoadTsFileNode.java | 8 +
.../scheduler/load/LoadTsFileDispatcherImpl.java | 2 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 185 ++----------
.../apache/iotdb/db/utils/TimePartitionUtils.java | 6 +
.../execution/load/MergedTsFileSplitterTest.java | 35 ++-
.../db/queryengine/execution/load/TestBase.java | 317 ++++++++++++++++++---
.../execution/load/TsFileSplitSenderTest.java | 186 ++++++++++++
.../execution/load/TsFileSplitterTest.java | 15 +-
.../apache/iotdb/commons/client/ClientManager.java | 2 +-
.../sync/SyncDataNodeInternalServiceClient.java | 15 +
.../src/main/thrift/datanode.thrift | 4 +
20 files changed, 1200 insertions(+), 273 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
index 6fb25fec0c7..630ba5f2fcb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
@@ -72,6 +72,7 @@ public class AlignedChunkData implements ChunkData {
private AlignedChunkWriterImpl chunkWriter;
private List<Chunk> chunkList;
+ private int splitId;
public AlignedChunkData(
String device, ChunkHeader chunkHeader, TTimePartitionSlot
timePartitionSlot) {
@@ -151,6 +152,7 @@ public class AlignedChunkData implements ChunkData {
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
byteStream.writeTo(stream);
+ ReadWriteIOUtils.write(splitId, stream);
close();
}
@@ -426,6 +428,8 @@ public class AlignedChunkData implements ChunkData {
chunkData.pageNumbers = pageNumbers;
chunkData.deserializeTsFileData(stream);
chunkData.close();
+
+ chunkData.setSplitId(ReadWriteIOUtils.readInt(stream));
return chunkData;
}
@@ -453,6 +457,18 @@ public class AlignedChunkData implements ChunkData {
+ dataSize
+ ", needDecodeChunk="
+ needDecodeChunk
+ + ", splitId="
+ + splitId
+ '}';
}
+
+ @Override
+ public int getSplitId() {
+ return splitId;
+ }
+
+ @Override
+ public void setSplitId(int sid) {
+ this.splitId = sid;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
new file mode 100644
index 00000000000..f969a2746a6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DataPartitionBatchFetcher {
+
+ private final IPartitionFetcher fetcher;
+
+ public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
+ this.fetcher = fetcher;
+ }
+
+ public List<TRegionReplicaSet> queryDataPartition(
+ List<Pair<String, TTimePartitionSlot>> slotList) {
+ List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+ int size = slotList.size();
+
+ for (int i = 0; i < size; i += LoadTsFileScheduler.TRANSMIT_LIMIT) {
+ List<Pair<String, TTimePartitionSlot>> subSlotList =
+ slotList.subList(i, Math.min(size, i +
LoadTsFileScheduler.TRANSMIT_LIMIT));
+ DataPartition dataPartition =
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
+ replicaSets.addAll(
+ subSlotList.stream()
+ .map(pair ->
dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
+ .collect(Collectors.toList()));
+ }
+ return replicaSets;
+ }
+
+ private List<DataPartitionQueryParam> toQueryParam(List<Pair<String,
TTimePartitionSlot>> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ Pair::getLeft, Collectors.mapping(Pair::getRight,
Collectors.toSet())))
+ .entrySet()
+ .stream()
+ .map(
+ entry -> new DataPartitionQueryParam(entry.getKey(), new
ArrayList<>(entry.getValue())))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
index 3f486af4adb..973cd73e5a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
@@ -33,6 +33,7 @@ import java.io.InputStream;
public class DeletionData implements TsFileData {
private final Deletion deletion;
+ private int splitId;
public DeletionData(Deletion deletion) {
this.deletion = deletion;
@@ -69,4 +70,14 @@ public class DeletionData implements TsFileData {
throws IllegalPathException, IOException {
return new DeletionData(Deletion.deserializeWithoutFileOffset(new
DataInputStream(stream)));
}
+
+ @Override
+ public int getSplitId() {
+ return splitId;
+ }
+
+ @Override
+ public void setSplitId(int sid) {
+ this.splitId = sid;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
index 1a969ebe766..c5d4700823d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
@@ -19,18 +19,7 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import java.io.DataOutputStream;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -55,11 +44,12 @@ import
org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -73,6 +63,13 @@ import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class MergedTsFileSplitter {
@@ -83,16 +80,22 @@ public class MergedTsFileSplitter {
private final Function<TsFileData, Boolean> consumer;
private final PriorityQueue<SplitTask> taskPriorityQueue;
private ExecutorService asyncExecutor;
-
- public MergedTsFileSplitter(List<File> tsFiles, Function<TsFileData,
Boolean> consumer, ExecutorService asyncExecutor) {
+ private long timePartitionInterval;
+ private AtomicInteger splitIdGenerator = new AtomicInteger();
+
+ public MergedTsFileSplitter(
+ List<File> tsFiles,
+ Function<TsFileData, Boolean> consumer,
+ ExecutorService asyncExecutor,
+ long timePartitionInterval) {
this.tsFiles = tsFiles;
this.consumer = consumer;
this.asyncExecutor = asyncExecutor;
+ this.timePartitionInterval = timePartitionInterval;
taskPriorityQueue = new PriorityQueue<>();
}
- public void splitTsFileByDataPartition()
- throws IOException, IllegalStateException {
+ public void splitTsFileByDataPartition() throws IOException,
IllegalStateException {
for (File tsFile : tsFiles) {
SplitTask splitTask = new SplitTask(tsFile, asyncExecutor);
if (splitTask.hasNext()) {
@@ -115,6 +118,7 @@ public class MergedTsFileSplitter {
for (SplitTask equalTask : equalTasks) {
TsFileData tsFileData = equalTask.removeNext();
+ tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
consumer.apply(tsFileData);
if (equalTask.hasNext()) {
taskPriorityQueue.add(equalTask);
@@ -203,15 +207,17 @@ public class MergedTsFileSplitter {
nextSplits = new LinkedBlockingDeque<>(64);
if (asyncExecutor != null) {
- asyncTask = asyncExecutor.submit(() -> {
- try {
- asyncComputeNext();
- } catch (Throwable e) {
- logger.info("Exception during splitting", e);
- throw e;
- }
- return null;
- });
+ asyncTask =
+ asyncExecutor.submit(
+ () -> {
+ try {
+ asyncComputeNext();
+ } catch (Throwable e) {
+ logger.info("Exception during splitting", e);
+ throw e;
+ }
+ return null;
+ });
}
}
@@ -336,7 +342,8 @@ public class MergedTsFileSplitter {
== TsFileConstant.TIME_COLUMN_MASK);
IChunkMetadata chunkMetadata =
offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
TTimePartitionSlot timePartitionSlot =
-
TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime());
+ TimePartitionUtils.getTimePartition(
+ chunkMetadata.getStartTime(), timePartitionInterval);
ChunkData chunkData =
ChunkData.createChunkData(isAligned, curDevice, header,
timePartitionSlot);
@@ -379,7 +386,7 @@ public class MergedTsFileSplitter {
? chunkMetadata.getStartTime()
: pageHeader.getStartTime();
TTimePartitionSlot pageTimePartitionSlot =
- TimePartitionUtils.getTimePartition(startTime);
+ TimePartitionUtils.getTimePartition(startTime,
timePartitionInterval);
if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
if (!isAligned) {
insertNewChunk(chunkData);
@@ -406,9 +413,7 @@ public class MergedTsFileSplitter {
}
int start = 0;
- long endTime =
- timePartitionSlot.getStartTime()
- + TimePartitionUtils.getTimePartitionInterval();
+ long endTime = timePartitionSlot.getStartTime() +
timePartitionInterval;
for (int i = 0; i < times.length; i++) {
if (times[i] >= endTime) {
chunkData.writeDecodePage(times, values, start, i);
@@ -420,10 +425,9 @@ public class MergedTsFileSplitter {
insertNewChunk(chunkData);
}
- timePartitionSlot =
TimePartitionUtils.getTimePartition(times[i]);
- endTime =
- timePartitionSlot.getStartTime()
- + TimePartitionUtils.getTimePartitionInterval();
+ timePartitionSlot =
+ TimePartitionUtils.getTimePartition(times[i],
timePartitionInterval);
+ endTime = timePartitionSlot.getStartTime() +
timePartitionInterval;
chunkData =
ChunkData.createChunkData(isAligned, curDevice,
header, timePartitionSlot);
start = i;
@@ -531,9 +535,7 @@ public class MergedTsFileSplitter {
}
@Override
- public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
-
- }
+ public void writeToFileWriter(TsFileIOWriter writer) throws IOException
{}
@Override
public boolean isModification() {
@@ -541,9 +543,15 @@ public class MergedTsFileSplitter {
}
@Override
- public void serialize(DataOutputStream stream) throws IOException {
+ public void serialize(DataOutputStream stream) throws IOException {}
+ @Override
+ public int getSplitId() {
+ return 0;
}
+
+ @Override
+ public void setSplitId(int sid) {}
}
private void getAllModification(Map<Long, List<Deletion>>
offset2Deletions) throws IOException {
@@ -616,17 +624,24 @@ public class MergedTsFileSplitter {
}
private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
- return !TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
-
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
+ return !TimePartitionUtils.getTimePartition(
+ chunkMetadata.getStartTime(), timePartitionInterval)
+ .equals(
+ TimePartitionUtils.getTimePartition(
+ chunkMetadata.getEndTime(), timePartitionInterval));
}
private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata
chunkMetadata) {
if (pageHeader.getStatistics() == null) {
- return
!TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
-
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
- }
- return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime())
-
.equals(TimePartitionUtils.getTimePartition(pageHeader.getEndTime()));
+ return !TimePartitionUtils.getTimePartition(
+ chunkMetadata.getStartTime(), timePartitionInterval)
+ .equals(
+ TimePartitionUtils.getTimePartition(
+ chunkMetadata.getEndTime(), timePartitionInterval));
+ }
+ return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime(),
timePartitionInterval)
+ .equals(
+ TimePartitionUtils.getTimePartition(pageHeader.getEndTime(),
timePartitionInterval));
}
private Pair<long[], Object[]> decodePage(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
index 6bfcdfadf44..ab7a3f2e406 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
@@ -57,6 +57,7 @@ public class NonAlignedChunkData implements ChunkData {
private ChunkWriterImpl chunkWriter;
private Chunk chunk;
+ private int splitId;
public NonAlignedChunkData(
String device, ChunkHeader chunkHeader, TTimePartitionSlot
timePartitionSlot) {
@@ -121,6 +122,7 @@ public class NonAlignedChunkData implements ChunkData {
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
byteStream.writeTo(stream);
+ ReadWriteIOUtils.write(splitId, stream);
close();
}
@@ -309,6 +311,8 @@ public class NonAlignedChunkData implements ChunkData {
chunkData.pageNumber = pageNumber;
chunkData.deserializeTsFileData(stream);
chunkData.close();
+
+ chunkData.setSplitId(ReadWriteIOUtils.readInt(stream));
return chunkData;
}
@@ -336,6 +340,18 @@ public class NonAlignedChunkData implements ChunkData {
+ chunkHeader
+ ", needDecodeChunk="
+ needDecodeChunk
+ + ", splitId="
+ + splitId
+ '}';
}
+
+ @Override
+ public int getSplitId() {
+ return splitId;
+ }
+
+ @Override
+ public void setSplitId(int sid) {
+ this.splitId = sid;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
index a53a3f5fc30..79b96471623 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
@@ -37,6 +37,10 @@ public interface TsFileData {
void serialize(DataOutputStream stream) throws IOException;
+ int getSplitId();
+
+ void setSplitId(int sid);
+
static TsFileData deserialize(InputStream stream)
throws IOException, PageException, IllegalPathException {
boolean isModification = ReadWriteIOUtils.readBool(stream);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
new file mode 100644
index 00000000000..cc2c2c9556a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * TsFileDataManager batches splits generated by TsFileSplitter as
LoadTsFilePieceNode, route the
+ * splits to associated replica set, and sends them to the replicas with the
provided dispatching
+ * function.
+ */
+@SuppressWarnings("BooleanMethodIsAlwaysInverted")
+public class TsFileDataManager {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileDataManager.class);
+ private final DispatchFunction dispatchFunction;
+ private final DataPartitionBatchFetcher partitionBatchFetcher;
+ private final PlanNodeId planNodeId;
+ private final File targetFile;
+
+ private long dataSize;
+ private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+ private final List<ChunkData> nonDirectionalChunkData;
+
+ @FunctionalInterface
+ public interface DispatchFunction {
+
+ boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode,
TRegionReplicaSet replicaSet);
+ }
+
+ public TsFileDataManager(
+ DispatchFunction dispatchFunction,
+ PlanNodeId planNodeId,
+ File targetFile,
+ DataPartitionBatchFetcher partitionBatchFetcher) {
+ this.dispatchFunction = dispatchFunction;
+ this.planNodeId = planNodeId;
+ this.targetFile = targetFile;
+ this.dataSize = 0;
+ this.replicaSet2Piece = new HashMap<>();
+ this.nonDirectionalChunkData = new ArrayList<>();
+ this.partitionBatchFetcher = partitionBatchFetcher;
+ }
+
+ public boolean addOrSendTsFileData(TsFileData tsFileData) {
+ return tsFileData.isModification()
+ ? addOrSendDeletionData(tsFileData)
+ : addOrSendChunkData((ChunkData) tsFileData);
+ }
+
+ private boolean addOrSendChunkData(ChunkData chunkData) {
+ nonDirectionalChunkData.add(chunkData);
+ dataSize += chunkData.getDataSize();
+
+ if (dataSize > LoadTsFileScheduler.MAX_MEMORY_SIZE) {
+ routeChunkData();
+
+ // start to dispatch from the biggest TsFilePieceNode
+ List<TRegionReplicaSet> sortedReplicaSets =
+ replicaSet2Piece.keySet().stream()
+ .sorted(
+ Comparator.comparingLong(o ->
replicaSet2Piece.get(o).getDataSize()).reversed())
+ .collect(Collectors.toList());
+
+ for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
+ LoadTsFilePieceNode pieceNode = replicaSet2Piece.get(sortedReplicaSet);
+ if (pieceNode.getDataSize() == 0) { // total data size has been
reduced to 0
+ break;
+ }
+ if (!dispatchFunction.dispatchOnePieceNode(pieceNode,
sortedReplicaSet)) {
+ return false;
+ }
+
+ dataSize -= pieceNode.getDataSize();
+ replicaSet2Piece.put(
+ sortedReplicaSet,
+ new LoadTsFilePieceNode(
+ planNodeId, targetFile)); // can not just remove, because of
deletion
+ if (dataSize <= LoadTsFileScheduler.MAX_MEMORY_SIZE) {
+ break;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private void routeChunkData() {
+ if (nonDirectionalChunkData.isEmpty()) {
+ return;
+ }
+
+ List<TRegionReplicaSet> replicaSets =
+ partitionBatchFetcher.queryDataPartition(
+ nonDirectionalChunkData.stream()
+ .map(data -> new Pair<>(data.getDevice(),
data.getTimePartitionSlot()))
+ .collect(Collectors.toList()));
+ IntStream.range(0, nonDirectionalChunkData.size())
+ .forEach(
+ i ->
+ replicaSet2Piece
+ .computeIfAbsent(
+ replicaSets.get(i), o -> new
LoadTsFilePieceNode(planNodeId, targetFile))
+ .addTsFileData(nonDirectionalChunkData.get(i)));
+ nonDirectionalChunkData.clear();
+ }
+
+ private boolean addOrSendDeletionData(TsFileData deletionData) {
+ routeChunkData(); // ensure chunk data will be added before deletion
+
+ for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
+ dataSize += deletionData.getDataSize();
+ entry.getValue().addTsFileData(deletionData);
+ }
+ return true;
+ }
+
+ public boolean sendAllTsFileData() {
+ routeChunkData();
+
+ for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
+ if (!dispatchFunction.dispatchOnePieceNode(entry.getValue(),
entry.getKey())) {
+ logger.warn("Dispatch piece node {} of TsFile {} error.",
entry.getValue(), targetFile);
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
new file mode 100644
index 00000000000..06dc76881d3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
+
+public class TsFileSplitSender {
+
+ private static final int MAX_RETRY = 5;
+ private static final long RETRY_INTERVAL_MS = 6_000L;
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileSplitSender.class);
+
+ private LoadTsFileNode loadTsFileNode;
+ private DataPartitionBatchFetcher targetPartitionFetcher;
+ private long targetPartitionInterval;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+ // All consensus groups accessed in Phase1 should be notified in Phase2
+ private final Set<TRegionReplicaSet> allReplicaSets = new
ConcurrentSkipListSet<>();
+ private String uuid;
+ private Map<TDataNodeLocation, Double> dataNodeThroughputMap = new
ConcurrentHashMap<>();
+ private Random random = new Random();
+ private boolean isGeneratedByPipe;
+ private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception>
phaseOneFailures =
+ new ConcurrentHashMap<>();
+ private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
+
+ public TsFileSplitSender(
+ LoadTsFileNode loadTsFileNode,
+ DataPartitionBatchFetcher targetPartitionFetcher,
+ long targetPartitionInterval,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager,
+ boolean isGeneratedByPipe) {
+ this.loadTsFileNode = loadTsFileNode;
+ this.targetPartitionFetcher = targetPartitionFetcher;
+ this.targetPartitionInterval = targetPartitionInterval;
+ this.internalServiceClientManager = internalServiceClientManager;
+ this.isGeneratedByPipe = isGeneratedByPipe;
+ }
+
+ public void start() throws IOException {
+ // skip files without data
+ loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
+ uuid = UUID.randomUUID().toString();
+
+ boolean isFirstPhaseSuccess = firstPhase(loadTsFileNode);
+ boolean isSecondPhaseSuccess = secondPhase(isFirstPhaseSuccess);
+ if (isFirstPhaseSuccess && isSecondPhaseSuccess) {
+ logger.info("Load TsFiles {} Successfully",
loadTsFileNode.getResources());
+ } else {
+ logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
+ }
+ }
+
+ private boolean firstPhase(LoadTsFileNode node) throws IOException {
+ TsFileDataManager tsFileDataManager =
+ new TsFileDataManager(
+ this::dispatchOnePieceNode,
+ node.getPlanNodeId(),
+ node.lastResource().getTsFile(),
+ targetPartitionFetcher);
+ ExecutorService executorService =
+ IoTDBThreadPoolFactory.newThreadPool(
+ 16,
+ Integer.MAX_VALUE,
+ 20,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new IoTThreadFactory("MergedTsFileSplitter"),
+ "MergedTsFileSplitter");
+ new MergedTsFileSplitter(
+ node.getResources().stream()
+ .map(TsFileResource::getTsFile)
+ .collect(Collectors.toList()),
+ tsFileDataManager::addOrSendTsFileData,
+ executorService,
+ targetPartitionInterval)
+ .splitTsFileByDataPartition();
+ return tsFileDataManager.sendAllTsFileData() && phaseOneFailures.isEmpty();
+ }
+
+ private boolean secondPhase(boolean isFirstPhaseSuccess) {
+ logger.info("Start dispatching Load command for uuid {}", uuid);
+ TLoadCommandReq loadCommandReq =
+ new TLoadCommandReq(
+ (isFirstPhaseSuccess ? LoadCommand.EXECUTE :
LoadCommand.ROLLBACK).ordinal(), uuid);
+ loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
+
+ for (TRegionReplicaSet replicaSet : allReplicaSets) {
+ loadCommandReq.setUseConsensus(true);
+ for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
+ TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
+ Exception groupException = null;
+ loadCommandReq.setConsensusGroupId(replicaSet.getRegionId());
+
+ for (int i = 0; i < MAX_RETRY; i++) {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ groupException = new
FragmentInstanceDispatchException(loadResp.status);
+ }
+ break;
+ } catch (ClientManagerException | TException e) {
+ logger.warn(NODE_CONNECTION_ERROR, endPoint, e);
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage(
+ "can't connect to node {}, please reset longer
dn_connection_timeout_ms "
+ + "in iotdb-common.properties and restart iotdb."
+ + endPoint);
+ groupException = new FragmentInstanceDispatchException(status);
+ }
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ groupException = e;
+ break;
+ }
+ }
+
+ if (groupException != null) {
+ phaseTwoFailures.put(replicaSet, groupException);
+ } else {
+ break;
+ }
+ }
+ }
+
+ return phaseTwoFailures.isEmpty();
+ }
+
+ /**
+ * The rank (probability of being chosen) is calculated as throughput /
totalThroughput for those
+ * nodes that have not been used, their throughput is defined as
Float.MAX_VALUE
+ *
+ * @param replicaSet replica set to be ranked
+ * @return the nodes and their ranks
+ */
+ private List<Pair<TDataNodeLocation, Double>>
rankLocations(TRegionReplicaSet replicaSet) {
+ List<Pair<TDataNodeLocation, Double>> locations =
+ new ArrayList<>(replicaSet.dataNodeLocations.size());
+ // retrieve throughput of each node
+ double totalThroughput = 0.0;
+ for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
+ // use Float.MAX_VALUE so that they can be added together
+ double throughput =
+ dataNodeThroughputMap.computeIfAbsent(dataNodeLocation, l ->
(double) Float.MAX_VALUE);
+ locations.add(new Pair<>(dataNodeLocation, throughput));
+ totalThroughput += throughput;
+ }
+ // calculate cumulative ranks
+ locations.get(0).right = locations.get(0).right / totalThroughput;
+ for (int i = 1; i < locations.size(); i++) {
+ Pair<TDataNodeLocation, Double> location = locations.get(i);
+ location.right = location.right / totalThroughput + locations.get(i -
1).right;
+ }
+ return locations;
+ }
+
+ private Pair<TDataNodeLocation, Double> chooseNextLocation(
+ List<Pair<TDataNodeLocation, Double>> locations) {
+ int chosen = 0;
+ double dice = random.nextDouble();
+ for (int i = 1; i < locations.size() - 1; i++) {
+ if (locations.get(i - 1).right <= dice && dice < locations.get(i).right)
{
+ chosen = i;
+ }
+ }
+ Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
+ // update ranks
+ for (Pair<TDataNodeLocation, Double> location : locations) {
+ location.right = location.right / (1 - chosenPair.right);
+ }
+ return chosenPair;
+ }
+
+ @SuppressWarnings("BusyWait")
+ public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode,
TRegionReplicaSet replicaSet) {
+ allReplicaSets.add(replicaSet);
+
+ TTsFilePieceReq loadTsFileReq =
+ new TTsFilePieceReq(pieceNode.serializeToByteBuffer(), uuid,
replicaSet.getRegionId());
+ loadTsFileReq.isRelay = true;
+ List<Pair<TDataNodeLocation, Double>> locations =
rankLocations(replicaSet);
+
+ long startTime = 0;
+ boolean loadSucceed = false;
+ Exception lastConnectionError = null;
+ TDataNodeLocation currLocation = null;
+ while (!locations.isEmpty()) {
+ // the chosen location is removed from the list
+ Pair<TDataNodeLocation, Double> locationRankPair =
chooseNextLocation(locations);
+ currLocation = locationRankPair.left;
+ startTime = System.currentTimeMillis();
+ for (int i = 0; i < MAX_RETRY; i++) {
+ try (SyncDataNodeInternalServiceClient client =
+
internalServiceClientManager.borrowClient(currLocation.internalEndPoint)) {
+ TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ phaseOneFailures.put(
+ new Pair<>(pieceNode, replicaSet),
+ new FragmentInstanceDispatchException(loadResp.status));
+ return false;
+ }
+ loadSucceed = true;
+ break;
+ } catch (ClientManagerException | TException e) {
+ lastConnectionError = e;
+ }
+
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ if (loadSucceed) {
+ break;
+ }
+ }
+
+ if (!loadSucceed) {
+ String warning = NODE_CONNECTION_ERROR;
+ logger.warn(warning, locations, lastConnectionError);
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage(warning + locations);
+ phaseOneFailures.put(
+ new Pair<>(pieceNode, replicaSet), new
FragmentInstanceDispatchException(status));
+ return false;
+ }
+ long timeConsumption = System.currentTimeMillis() - startTime;
+ dataNodeThroughputMap.put(currLocation, pieceNode.getDataSize() * 1.0 /
timeConsumption);
+ return true;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
index 2c915c10941..5542e04cf65 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
@@ -58,6 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class TsFileSplitter {
@@ -65,10 +66,16 @@ public class TsFileSplitter {
private final File tsFile;
private final Function<TsFileData, Boolean> consumer;
+ private final AtomicInteger splitIdGenerator = new AtomicInteger();
- public TsFileSplitter(File tsFile, Function<TsFileData, Boolean> consumer) {
+ public TsFileSplitter(File tsFile, Function<TsFileData, Boolean> consumer,
int startSplitId) {
this.tsFile = tsFile;
this.consumer = consumer;
+ splitIdGenerator.set(startSplitId);
+ }
+
+ public int getCurrentSplitId() {
+ return splitIdGenerator.get();
}
@SuppressWarnings({"squid:S3776", "squid:S6541"})
@@ -344,7 +351,12 @@ public class TsFileSplitter {
offset2Deletions
.pollFirstEntry()
.getValue()
- .forEach(o -> consumer.apply(new DeletionData(o)));
+ .forEach(
+ o -> {
+ DeletionData deletionData = new DeletionData(o);
+ deletionData.setSplitId(splitIdGenerator.incrementAndGet());
+ consumer.apply(deletionData);
+ });
}
}
@@ -359,6 +371,7 @@ public class TsFileSplitter {
allChunkData.addAll(entry.getValue());
}
for (ChunkData chunkData : allChunkData) {
+ chunkData.setSplitId(splitIdGenerator.incrementAndGet());
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
throw new IllegalStateException(
String.format(
@@ -370,6 +383,7 @@ public class TsFileSplitter {
}
private void consumeChunkData(String measurement, long offset, ChunkData
chunkData) {
+ chunkData.setSplitId(splitIdGenerator.incrementAndGet());
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
throw new IllegalStateException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 793e6139693..6e9cc5bee33 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -111,4 +111,12 @@ public class LoadTsFileNode extends WritePlanNode {
public int hashCode() {
return Objects.hash(resources);
}
+
+ public List<TsFileResource> getResources() {
+ return resources;
+ }
+
+ public TsFileResource lastResource() {
+ return resources.get(resources.size() - 1);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 620920a01a8..f44c01e0b8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -70,7 +70,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
private final ExecutorService executor;
private final boolean isGeneratedByPipe;
- private static final String NODE_CONNECTION_ERROR = "can't connect to node
{}";
+ public static final String NODE_CONNECTION_ERROR = "can't connect to node
{}";
public LoadTsFileDispatcherImpl(
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 834829c3849..cf93395cbf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -22,12 +22,9 @@ package org.apache.iotdb.db.queryengine.plan.scheduler.load;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -36,8 +33,8 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
+import
org.apache.iotdb.db.queryengine.execution.load.DataPartitionBatchFetcher;
+import org.apache.iotdb.db.queryengine.execution.load.TsFileDataManager;
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
@@ -49,7 +46,6 @@ import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.Pair;
import io.airlift.units.Duration;
import org.slf4j.Logger;
@@ -58,11 +54,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
@@ -70,8 +63,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* {@link LoadTsFileScheduler} is used for scheduling {@link
LoadSingleTsFileNode} and {@link
@@ -81,10 +72,11 @@ import java.util.stream.IntStream;
*
href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe">...</a>;
*/
public class LoadTsFileScheduler implements IScheduler {
+
private static final Logger logger =
LoggerFactory.getLogger(LoadTsFileScheduler.class);
public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
- private static final long MAX_MEMORY_SIZE;
- private static final int TRANSMIT_LIMIT;
+ public static final long MAX_MEMORY_SIZE;
+ public static final int TRANSMIT_LIMIT;
static {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -96,6 +88,10 @@ public class LoadTsFileScheduler implements IScheduler {
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
}
+ public static long getMaxMemorySize() {
+ return MAX_MEMORY_SIZE;
+ }
+
private final MPPQueryContext queryContext;
private final QueryStateMachine stateMachine;
private final LoadTsFileDispatcherImpl dispatcher;
@@ -191,9 +187,14 @@ public class LoadTsFileScheduler implements IScheduler {
private boolean firstPhase(LoadSingleTsFileNode node) {
try {
- TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node);
+ TsFileDataManager tsFileDataManager =
+ new TsFileDataManager(
+ this::dispatchOnePieceNode,
+ node.getPlanNodeId(),
+ node.getTsFileResource().getTsFile(),
+ partitionFetcher);
new TsFileSplitter(
- node.getTsFileResource().getTsFile(),
tsFileDataManager::addOrSendTsFileData)
+ node.getTsFileResource().getTsFile(),
tsFileDataManager::addOrSendTsFileData, 0)
.splitTsFileByDataPartition();
if (!tsFileDataManager.sendAllTsFileData()) {
stateMachine.transitionToFailed(new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
@@ -216,8 +217,7 @@ public class LoadTsFileScheduler implements IScheduler {
return true;
}
- private boolean dispatchOnePieceNode(
- LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
+ public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode,
TRegionReplicaSet replicaSet) {
allReplicaSets.add(replicaSet);
FragmentInstance instance =
new FragmentInstance(
@@ -361,154 +361,7 @@ public class LoadTsFileScheduler implements IScheduler {
ROLLBACK
}
- private static class TsFileDataManager {
- private final LoadTsFileScheduler scheduler;
- private final LoadSingleTsFileNode singleTsFileNode;
-
- private long dataSize;
- private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
- private final List<ChunkData> nonDirectionalChunkData;
-
- public TsFileDataManager(LoadTsFileScheduler scheduler,
LoadSingleTsFileNode singleTsFileNode) {
- this.scheduler = scheduler;
- this.singleTsFileNode = singleTsFileNode;
- this.dataSize = 0;
- this.replicaSet2Piece = new HashMap<>();
- this.nonDirectionalChunkData = new ArrayList<>();
- }
-
- private boolean addOrSendTsFileData(TsFileData tsFileData) {
- return tsFileData.isModification()
- ? addOrSendDeletionData(tsFileData)
- : addOrSendChunkData((ChunkData) tsFileData);
- }
-
- private boolean addOrSendChunkData(ChunkData chunkData) {
- nonDirectionalChunkData.add(chunkData);
- dataSize += chunkData.getDataSize();
-
- if (dataSize > MAX_MEMORY_SIZE) {
- routeChunkData();
-
- // start to dispatch from the biggest TsFilePieceNode
- List<TRegionReplicaSet> sortedReplicaSets =
- replicaSet2Piece.keySet().stream()
- .sorted(
- Comparator.comparingLong(o ->
replicaSet2Piece.get(o).getDataSize()).reversed())
- .collect(Collectors.toList());
-
- for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
- LoadTsFilePieceNode pieceNode =
replicaSet2Piece.get(sortedReplicaSet);
- if (pieceNode.getDataSize() == 0) { // total data size has been
reduced to 0
- break;
- }
- if (!scheduler.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) {
- return false;
- }
-
- dataSize -= pieceNode.getDataSize();
- replicaSet2Piece.put(
- sortedReplicaSet,
- new LoadTsFilePieceNode(
- singleTsFileNode.getPlanNodeId(),
- singleTsFileNode
- .getTsFileResource()
- .getTsFile())); // can not just remove, because of
deletion
- if (dataSize <= MAX_MEMORY_SIZE) {
- break;
- }
- }
- }
-
- return true;
- }
-
- private void routeChunkData() {
- if (nonDirectionalChunkData.isEmpty()) {
- return;
- }
-
- List<TRegionReplicaSet> replicaSets =
- scheduler.partitionFetcher.queryDataPartition(
- nonDirectionalChunkData.stream()
- .map(data -> new Pair<>(data.getDevice(),
data.getTimePartitionSlot()))
- .collect(Collectors.toList()));
- IntStream.range(0, nonDirectionalChunkData.size())
- .forEach(
- i ->
- replicaSet2Piece
- .computeIfAbsent(
- replicaSets.get(i),
- o ->
- new LoadTsFilePieceNode(
- singleTsFileNode.getPlanNodeId(),
-
singleTsFileNode.getTsFileResource().getTsFile()))
- .addTsFileData(nonDirectionalChunkData.get(i)));
- nonDirectionalChunkData.clear();
- }
-
- private boolean addOrSendDeletionData(TsFileData deletionData) {
- routeChunkData(); // ensure chunk data will be added before deletion
-
- for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
- dataSize += deletionData.getDataSize();
- entry.getValue().addTsFileData(deletionData);
- }
- return true;
- }
-
- private boolean sendAllTsFileData() {
- routeChunkData();
-
- for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
- if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey()))
{
- logger.warn(
- "Dispatch piece node {} of TsFile {} error.",
- entry.getValue(),
- singleTsFileNode.getTsFileResource().getTsFile());
- return false;
- }
- }
- return true;
- }
- }
-
- private static class DataPartitionBatchFetcher {
- private final IPartitionFetcher fetcher;
-
- public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
- this.fetcher = fetcher;
- }
-
- public List<TRegionReplicaSet> queryDataPartition(
- List<Pair<String, TTimePartitionSlot>> slotList) {
- List<TRegionReplicaSet> replicaSets = new ArrayList<>();
- int size = slotList.size();
-
- for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
- List<Pair<String, TTimePartitionSlot>> subSlotList =
- slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
- DataPartition dataPartition =
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
- replicaSets.addAll(
- subSlotList.stream()
- .map(pair ->
dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
- .collect(Collectors.toList()));
- }
- return replicaSets;
- }
-
- private List<DataPartitionQueryParam> toQueryParam(
- List<Pair<String, TTimePartitionSlot>> slots) {
- return slots.stream()
- .collect(
- Collectors.groupingBy(
- Pair::getLeft, Collectors.mapping(Pair::getRight,
Collectors.toSet())))
- .entrySet()
- .stream()
- .map(
- entry ->
- new DataPartitionQueryParam(entry.getKey(), new
ArrayList<>(entry.getValue())))
- .collect(Collectors.toList());
- }
+ public DataPartitionBatchFetcher getPartitionFetcher() {
+ return partitionFetcher;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
index 3b5e0f8177e..e72a2e2a17d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
@@ -32,6 +32,12 @@ public class TimePartitionUtils {
return timePartitionSlot;
}
+ public static TTimePartitionSlot getTimePartition(long time, long
timePartitionInterval) {
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+ timePartitionSlot.setStartTime(time - time % timePartitionInterval);
+ return timePartitionSlot;
+ }
+
public static long getTimePartitionInterval() {
return timePartitionInterval;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
index 6461ce04585..1d31c90ccc9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
@@ -19,14 +19,19 @@
package org.apache.iotdb.db.queryengine.execution.load;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+
+import org.junit.Test;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
-import org.junit.Test;
public class MergedTsFileSplitterTest extends TestBase {
@@ -35,20 +40,30 @@ public class MergedTsFileSplitterTest extends TestBase {
@Test
public void testSplit() throws IOException {
long start = System.currentTimeMillis();
- MergedTsFileSplitter splitter = new MergedTsFileSplitter(files,
this::consumeSplit,
- IoTDBThreadPoolFactory.newThreadPool(16, Integer.MAX_VALUE, 20,
TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- new IoTThreadFactory("MergedTsFileSplitter"),
"MergedTsFileSplitter"));
+ MergedTsFileSplitter splitter =
+ new MergedTsFileSplitter(
+ files,
+ this::consumeSplit,
+ IoTDBThreadPoolFactory.newThreadPool(
+ 16,
+ Integer.MAX_VALUE,
+ 20,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new IoTThreadFactory("MergedTsFileSplitter"),
+ "MergedTsFileSplitter"),
+ TimePartitionUtils.getTimePartitionInterval());
try {
splitter.splitTsFileByDataPartition();
for (TsFileData tsFileData : resultSet) {
-// System.out.println(tsFileData);
+ // System.out.println(tsFileData);
}
} finally {
splitter.close();
}
- System.out.printf("%d splits after %dms\n", resultSet.size(),
- System.currentTimeMillis() - start);
+ System.out.printf(
+ "%d splits after %dms\n", resultSet.size(), System.currentTimeMillis()
- start);
+ assertEquals(resultSet.size(), expectedChunkNum());
}
public boolean consumeSplit(TsFileData data) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index cbbab9bd7c3..d10acdebffc 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -19,12 +19,34 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.IntStream;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.client.ClientManager;
+import
org.apache.iotdb.commons.client.ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -33,11 +55,35 @@ import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolDecorator;
+import org.apache.thrift.protocol.TSet;
+import org.apache.thrift.protocol.TStruct;
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
public class TestBase {
private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
@@ -47,19 +93,29 @@ public class TestBase {
public static final String TEST_TSFILE_PATH =
BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) +
PARTIAL_PATH_STRING;
- protected int fileNum = 100;
+ protected int fileNum = new Random().nextInt(100) + 1;
// series number of each file, sn non-aligned series and 1 aligned series
with sn measurements
- protected int seriesNum = 100;
+ protected int seriesNum = 1;
// number of chunks of each series in a file, each series has only one chunk
in a file
protected double chunkTimeRangeRatio = 0.3;
// the interval between two consecutive points of a series
protected long pointInterval = 50_000;
- protected List<File> files = new ArrayList<>();
+ protected final List<File> files = new ArrayList<>();
+ protected final List<TsFileResource> tsFileResources = new ArrayList<>();
+ protected IPartitionFetcher partitionFetcher;
+ // the key is deviceId, not partitioned by time in the simple test
+ protected Map<String, TRegionReplicaSet> partitionTable = new HashMap<>();
+ protected Map<ConsensusGroupId, TRegionReplicaSet> groupId2ReplicaSetMap =
new HashMap<>();
+ protected IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
@Before
public void setup() throws IOException, WriteProcessException {
setupFiles();
- logger.info("Files set up");
+ logger.info("{} files set up", files.size());
+ partitionFetcher = dummyPartitionFetcher();
+ setupPartitionTable();
+ setupClientManager();
}
@After
@@ -69,48 +125,221 @@ public class TestBase {
}
}
- public void setupFiles() {
+ public int expectedChunkNum() {
+ double totalTimeRange = chunkTimeRangeRatio * fileNum;
+ int splitChunkNum = 0;
+ // if the boundary of the ith partition does not overlap a chunk, it
introduces an additional split
+ // TODO: due to machine precision, the calculation may have error
+ for (int i = 0; i <= totalTimeRange; i++) {
+ if (i * 1.0 % chunkTimeRangeRatio > 0.00001) {
+ splitChunkNum += 1;
+ }
+ }
+ return (splitChunkNum + fileNum) * seriesNum * 2;
+ }
- IntStream.range(0, fileNum).parallel().forEach(i -> {
- try {
- File file = new File(getTestTsFilePath("root.sg1", 0, 0, i));
- synchronized (files) {
- files.add(file);
- }
-
- try (TsFileWriter writer = new TsFileWriter(file)) {
- // 3 non-aligned series under d1 and 1 aligned series with 3
measurements under d2
- for (int sn = 0; sn < seriesNum; sn++) {
- writer.registerTimeseries(
- new Path("d1"), new MeasurementSchema("s" + sn,
TSDataType.DOUBLE));
- }
- List<MeasurementSchema> alignedSchemas = new ArrayList<>();
- for (int sn = 0; sn < seriesNum; sn++) {
- alignedSchemas.add(new MeasurementSchema("s" + sn,
TSDataType.DOUBLE));
- }
- writer.registerAlignedTimeseries(new Path("d2"), alignedSchemas);
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint) {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(req.body);
+ return new TLoadResp();
+ }
+
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint) {
+ return new TLoadResp();
+ }
+
+ public TProtocol dummyProtocol() throws TTransportException {
+ return new TBinaryProtocol(new TByteBuffer(ByteBuffer.allocate(0)));
+ }
+
+ public void setupClientManager() {
+ SyncDataNodeInternalServiceClientPoolFactory poolFactory =
+ new SyncDataNodeInternalServiceClientPoolFactory();
+ internalServiceClientManager =
+ new ClientManager<TEndPoint,
SyncDataNodeInternalServiceClient>(poolFactory) {
+ @Override
+ public SyncDataNodeInternalServiceClient borrowClient(TEndPoint
node) {
+ try {
+ return new SyncDataNodeInternalServiceClient(
+ dummyProtocol(),
+ new ThriftClientProperty.Builder().build(), node, this) {
+ @Override
+ public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) {
+ return handleTsFilePieceNode(req, getTEndpoint());
+ }
- long timePartitionInterval =
TimePartitionUtils.getTimePartitionInterval();
- long chunkTimeRange = (long) (timePartitionInterval *
chunkTimeRangeRatio);
- int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+ @Override
+ public TLoadResp sendLoadCommand(TLoadCommandReq req) {
+ return handleTsLoadCommand(req, getTEndpoint());
+ }
- for (int pn = 0; pn < chunkPointNum; pn++) {
- long currTime = chunkTimeRange * i + pointInterval * pn;
- TSRecord record = new TSRecord(currTime, "d1");
- for (int sn = 0; sn < seriesNum; sn++) {
- record.addTuple(new DoubleDataPoint("s" + sn, pn * 1.0));
+ @Override
+ public void close() {
+ }
+ };
+ } catch (TTransportException e) {
+ throw new RuntimeException(e);
}
- writer.write(record);
+ }
- record.deviceId = "d2";
- writer.writeAligned(record);
+ @Override
+ public void clear(TEndPoint node) {
}
- writer.flushAllChunkGroups();
- }
- } catch (IOException | WriteProcessException e) {
- throw new RuntimeException(e);
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+
+ public void setupPartitionTable() {
+ ConsensusGroupId d1GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(),
+ 0);
+ TRegionReplicaSet d1Replicas =
+ new TRegionReplicaSet(
+ d1GroupId.convertToTConsensusGroupId(),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(0)
+ .setInternalEndPoint(new TEndPoint("localhost", 10000)),
+ new TDataNodeLocation()
+ .setDataNodeId(1)
+ .setInternalEndPoint(new TEndPoint("localhost", 10001)),
+ new TDataNodeLocation()
+ .setDataNodeId(2)
+ .setInternalEndPoint(new TEndPoint("localhost", 10002))));
+ partitionTable.put("d1", d1Replicas);
+ groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
+
+ ConsensusGroupId d2GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(), 1);
+ TRegionReplicaSet d2Replicas =
+ new TRegionReplicaSet(
+ d2GroupId.convertToTConsensusGroupId(),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(3)
+ .setInternalEndPoint(new TEndPoint("localhost", 10003)),
+ new TDataNodeLocation()
+ .setDataNodeId(4)
+ .setInternalEndPoint(new TEndPoint("localhost", 10004)),
+ new TDataNodeLocation()
+ .setDataNodeId(5)
+ .setInternalEndPoint(new TEndPoint("localhost", 10005))));
+ partitionTable.put("d2", d2Replicas);
+ groupId2ReplicaSetMap.put(d2GroupId, d2Replicas);
+ }
+
+ public IPartitionFetcher dummyPartitionFetcher() {
+ return new IPartitionFetcher() {
+ @Override
+ public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+ return null;
+ }
+
+ @Override
+ public SchemaPartition getOrCreateSchemaPartition(PathPatternTree
patternTree) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ return null;
+ }
+
+ @Override
+ public SchemaNodeManagementPartition
getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, Integer level) {
+ return null;
+ }
+
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return false;
}
- });
+
+ @Override
+ public void invalidAllCache() {
+ }
+ };
+ }
+
+ public void setupFiles() {
+
+ IntStream.range(0, fileNum)
+ .parallel()
+ .forEach(
+ i -> {
+ try {
+ File file = new File(getTestTsFilePath("root.sg1", 0, 0, i));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ synchronized (files) {
+ files.add(file);
+ }
+
+ try (TsFileWriter writer = new TsFileWriter(file)) {
+ // sn non-aligned series under d1 and 1 aligned series with
sn measurements under
+ // d2
+ for (int sn = 0; sn < seriesNum; sn++) {
+ writer.registerTimeseries(
+ new Path("d1"), new MeasurementSchema("s" + sn,
TSDataType.DOUBLE));
+ }
+ List<MeasurementSchema> alignedSchemas = new ArrayList<>();
+ for (int sn = 0; sn < seriesNum; sn++) {
+ alignedSchemas.add(new MeasurementSchema("s" + sn,
TSDataType.DOUBLE));
+ }
+ writer.registerAlignedTimeseries(new Path("d2"),
alignedSchemas);
+
+ // one chunk for each series
+ long timePartitionInterval =
TimePartitionUtils.getTimePartitionInterval();
+ long chunkTimeRange = (long) (timePartitionInterval *
chunkTimeRangeRatio);
+ int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+
+ for (int pn = 0; pn < chunkPointNum; pn++) {
+ long currTime = chunkTimeRange * i + pointInterval * pn;
+ TSRecord record = new TSRecord(currTime, "d1");
+ for (int sn = 0; sn < seriesNum; sn++) {
+ record.addTuple(new DoubleDataPoint("s" + sn, pn * 1.0));
+ }
+ writer.write(record);
+
+ record.deviceId = "d2";
+ writer.writeAligned(record);
+ }
+ writer.flushAllChunkGroups();
+ tsFileResource.updateStartTime("d1", chunkTimeRange * i);
+ tsFileResource.updateStartTime("d2", chunkTimeRange * i);
+ tsFileResource.updateEndTime("d1", chunkTimeRange * (i + 1));
+ tsFileResource.updateEndTime("d2", chunkTimeRange * (i + 1));
+ }
+
+ tsFileResource.close();
+ synchronized (tsFileResources) {
+ tsFileResources.add(tsFileResource);
+ }
+ } catch (IOException | WriteProcessException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
public static String getTestTsFilePath(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
new file mode 100644
index 00000000000..dd8b20963ee
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.junit.Test;
+
+public class TsFileSplitSenderTest extends TestBase {
+
+ // the third key is UUid
+ protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File,
List<TsFileData>>>>>
+ phaseOneResults = new ConcurrentSkipListMap<>();
+ protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>>
phaseTwoResults =
+ new ConcurrentSkipListMap<>();
+
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint) {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(
+ req.body.slice());
+ List<TsFileData> tsFileData =
+ phaseOneResults
+ .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>(
+ Comparator.comparingInt(ConsensusGroupId::getId)))
+ .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(pieceNode.getTsFile(), f -> new ArrayList<>());
+ synchronized (tsFileData) {
+ tsFileData.addAll(pieceNode.getAllTsFileData());
+ }
+
+ // forward to other replicas in the group
+ if (req.isRelay) {
+ req.isRelay = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+ if (!otherPoint.equals(tEndpoint)) {
+ handleTsFilePieceNode(req, otherPoint);
+ }
+ }
+ }
+
+ return new TLoadResp().setAccepted(true)
+ .setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint) {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ phaseTwoResults
+ .computeIfAbsent(tEndpoint,
+ e -> new
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
+ .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(req.uuid, id -> req.commandType);
+
+ // forward to other replicas in the group
+ if (req.useConsensus) {
+ req.useConsensus = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+ if (!otherPoint.equals(tEndpoint)) {
+ handleTsLoadCommand(req, otherPoint);
+ }
+ }
+ }
+
+ return new TLoadResp().setAccepted(true)
+ .setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ @Test
+ public void test() throws IOException {
+ LoadTsFileNode loadTsFileNode =
+ new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
+ DataPartitionBatchFetcher partitionBatchFetcher =
+ new DataPartitionBatchFetcher(partitionFetcher) {
+ @Override
+ public List<TRegionReplicaSet> queryDataPartition(
+ List<Pair<String, TTimePartitionSlot>> slotList) {
+ return slotList.stream()
+ .map(p -> partitionTable.get(p.left))
+ .collect(Collectors.toList());
+ }
+ };
+ TsFileSplitSender splitSender =
+ new TsFileSplitSender(
+ loadTsFileNode,
+ partitionBatchFetcher,
+ TimePartitionUtils.getTimePartitionInterval(),
+ internalServiceClientManager,
+ false);
+ long start = System.currentTimeMillis();
+ splitSender.start();
+ long timeConsumption = System.currentTimeMillis() - start;
+
+ printResult();
+ System.out.printf("Split ends after %dms", timeConsumption);
+ }
+
+ public void printResult() {
+ System.out.print("Phase one:\n");
+ for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File,
List<TsFileData>>>>>
+ tEndPointMapEntry : phaseOneResults.entrySet()) {
+ TEndPoint endPoint = tEndPointMapEntry.getKey();
+ for (Entry<ConsensusGroupId, Map<String, Map<File, List<TsFileData>>>>
+ consensusGroupIdMapEntry : tEndPointMapEntry.getValue().entrySet()) {
+ ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
+ for (Entry<String, Map<File, List<TsFileData>>> stringMapEntry :
+ consensusGroupIdMapEntry.getValue().entrySet()) {
+ String uuid = stringMapEntry.getKey();
+ for (Entry<File, List<TsFileData>> fileListEntry :
stringMapEntry.getValue().entrySet()) {
+ File tsFile = fileListEntry.getKey();
+ List<TsFileData> chunks = fileListEntry.getValue();
+ System.out.printf(
+ "%s - %s - %s - %s - %s chunks\n", endPoint, consensusGroupId,
uuid, tsFile, chunks.size());
+ assertEquals(chunks.size(), expectedChunkNum() / 2);
+ }
+ }
+ }
+ }
+
+ System.out.print("Phase two:\n");
+ for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>>
tEndPointMapEntry :
+ phaseTwoResults.entrySet()) {
+ TEndPoint endPoint = tEndPointMapEntry.getKey();
+ for (Entry<ConsensusGroupId, Map<String, Integer>>
consensusGroupIdMapEntry :
+ tEndPointMapEntry.getValue().entrySet()) {
+ ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
+ for (Entry<String, Integer> stringMapEntry :
+ consensusGroupIdMapEntry.getValue().entrySet()) {
+ String uuid = stringMapEntry.getKey();
+ int command = stringMapEntry.getValue();
+ System.out.printf("%s - %s - %s - %s\n", endPoint, consensusGroupId,
uuid,
+ LoadCommand.values()[command]);
+ assertEquals(command, LoadCommand.EXECUTE.ordinal());
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
index 78b61c8d664..62b5995b61b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
@@ -19,26 +19,33 @@
package org.apache.iotdb.db.queryengine.execution.load;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.junit.Test;
-public class TsFileSplitterTest extends TestBase{
+public class TsFileSplitterTest extends TestBase {
private List<TsFileData> resultSet = new ArrayList<>();
@Test
public void testSplit() throws IOException {
long start = System.currentTimeMillis();
+ int splitId = 0;
for (File file : files) {
- TsFileSplitter splitter = new TsFileSplitter(file, this::consumeSplit);
+ TsFileSplitter splitter = new TsFileSplitter(file, this::consumeSplit,
splitId + 1);
splitter.splitTsFileByDataPartition();
+ splitId = splitter.getCurrentSplitId();
}
for (TsFileData tsFileData : resultSet) {
// System.out.println(tsFileData);
}
- System.out.printf("%d splits after %dms\n", resultSet.size(),
System.currentTimeMillis() - start);
+ System.out.printf(
+ "%d/%d splits after %dms\n", resultSet.size(), expectedChunkNum(),
System.currentTimeMillis() - start);
+ assertEquals(resultSet.size(), expectedChunkNum());
}
public boolean consumeSplit(TsFileData data) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 3ee338d4ec7..f99b93674cc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -35,7 +35,7 @@ public class ClientManager<K, V> implements IClientManager<K,
V> {
private final KeyedObjectPool<K, V> pool;
- ClientManager(IClientPoolFactory<K, V> factory) {
+ protected ClientManager(IClientPoolFactory<K, V> factory) {
pool = factory.createClientPool(this);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 2b90b5b2661..91e2f908b9d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -65,6 +66,20 @@ public class SyncDataNodeInternalServiceClient extends
IDataNodeRPCService.Clien
getInputProtocol().getTransport().open();
}
+ @TestOnly
+ public SyncDataNodeInternalServiceClient(
+ TProtocol protocol,
+ ThriftClientProperty property,
+ TEndPoint endpoint,
+ ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
clientManager)
+ throws TTransportException {
+ super(protocol);
+ this.printLogWhenEncounterException =
property.isPrintLogWhenEncounterException();
+ this.endpoint = endpoint;
+ this.clientManager = clientManager;
+ getInputProtocol().getTransport().open();
+ }
+
public int getTimeout() throws SocketException {
return ((TimeoutChangeableTransport)
getInputProtocol().getTransport()).getTimeOut();
}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 1268c399c5f..0cd4313e43f 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -308,12 +308,16 @@ struct TTsFilePieceReq{
1: required binary body
2: required string uuid
3: required common.TConsensusGroupId consensusGroupId
+ // if isRelay is true, the receiver should forward the request to other
replicas in the group
+ 4: optional bool isRelay
}
struct TLoadCommandReq{
1: required i32 commandType
2: required string uuid
3: optional bool isGeneratedByPipe
+ 4: optional bool useConsensus
+ 5: optional common.TConsensusGroupId consensusGroupId
}
struct TLoadResp{