This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load_v2 by this push:
new 593dd83d683 add statistics
593dd83d683 is described below
commit 593dd83d683565bdf7d7954c1e197b0fde2fac79
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Sep 27 09:08:17 2023 +0800
add statistics
---
.../load/DeviceBatchTsFileDataManager.java | 16 +-
.../execution/load/MergedTsFileSplitter.java | 108 ++++--
.../execution/load/TsFileDataManager.java | 8 +-
.../execution/load/TsFileSplitSender.java | 369 ++++++++++++++-----
.../load/locseq/FixedLocationSequencer.java | 7 +-
.../execution/load/locseq/LocationSequencer.java | 6 +-
.../execution/load/locseq/LocationStatistics.java | 35 +-
.../load/locseq/RandomLocationSequencer.java | 6 +-
.../locseq/ThroughputBasedLocationSequencer.java | 66 ++--
.../nodesplit/ClusteringMeasurementSplitter.java | 401 ++++++++++++++++-----
.../load/nodesplit/OrderedMeasurementSplitter.java | 11 +-
.../load/nodesplit/PieceNodeSplitter.java | 3 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 1 -
.../execution/load/LoadTsFileSchedulerTest.java | 125 ++++---
.../execution/load/MergedTsFileSplitterTest.java | 7 +-
.../db/queryengine/execution/load/TestBase.java | 141 +++++---
.../execution/load/TsFileSplitSenderTest.java | 200 ++++++----
.../execution/load/TsFileSplitterTest.java | 7 +-
.../org/apache/iotdb/db/utils/SequenceUtils.java | 42 ++-
.../src/main/thrift/datanode.thrift | 2 +
20 files changed, 1078 insertions(+), 483 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
index d79bf322011..b343c0b4a06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
@@ -19,19 +19,21 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import java.io.File;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-/**
- * Like TsFileDataManager, but one batch (TsFilePieceNode) belongs to the same
device.
- */
-public class DeviceBatchTsFileDataManager extends TsFileDataManager{
+import java.io.File;
+
+/** Like TsFileDataManager, but one batch (TsFilePieceNode) belongs to the
same device. */
+public class DeviceBatchTsFileDataManager extends TsFileDataManager {
private String currentDeviceId;
- public DeviceBatchTsFileDataManager(DispatchFunction dispatchFunction,
+ public DeviceBatchTsFileDataManager(
+ DispatchFunction dispatchFunction,
PlanNodeId planNodeId,
- File targetFile, DataPartitionBatchFetcher partitionBatchFetcher, long
maxMemorySize) {
+ File targetFile,
+ DataPartitionBatchFetcher partitionBatchFetcher,
+ long maxMemorySize) {
super(dispatchFunction, planNodeId, targetFile, partitionBatchFetcher,
maxMemorySize);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
index 231b273c1d1..2f9464c0830 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
@@ -79,65 +79,99 @@ public class MergedTsFileSplitter {
private final List<File> tsFiles;
private final Function<TsFileData, Boolean> consumer;
private final PriorityQueue<SplitTask> taskPriorityQueue;
+ private final int maxConcurrentFileNum;
private ExecutorService asyncExecutor;
private long timePartitionInterval;
private AtomicInteger splitIdGenerator = new AtomicInteger();
+ private Statistic statistic = new Statistic();
public MergedTsFileSplitter(
List<File> tsFiles,
Function<TsFileData, Boolean> consumer,
ExecutorService asyncExecutor,
- long timePartitionInterval) {
+ long timePartitionInterval,
+ int maxConcurrentFileNum) {
this.tsFiles = tsFiles;
this.consumer = consumer;
this.asyncExecutor = asyncExecutor;
this.timePartitionInterval = timePartitionInterval;
+ this.maxConcurrentFileNum = maxConcurrentFileNum;
taskPriorityQueue = new PriorityQueue<>();
}
public void splitTsFileByDataPartition() throws IOException,
IllegalStateException {
- for (File tsFile : tsFiles) {
- SplitTask splitTask = new SplitTask(tsFile, asyncExecutor);
+ long startTime = System.nanoTime();
+ int i = 0;
+ for (; i < tsFiles.size(); i++) {
+ // only allow at most maxConcurrentFileNum files to be merged at the
same time
+ if (taskPriorityQueue.size() > maxConcurrentFileNum) {
+ break;
+ }
+
+ File tsFile = tsFiles.get(i);
+ SplitTask splitTask = new SplitTask(tsFile, asyncExecutor, i);
+ logger.info("Start to split {}", tsFiles.get(i));
if (splitTask.hasNext()) {
taskPriorityQueue.add(splitTask);
}
}
+ statistic.initTime = System.nanoTime() - startTime;
- List<SplitTask> equalTasks = new ArrayList<>();
while (!taskPriorityQueue.isEmpty()) {
+ startTime = System.nanoTime();
SplitTask task = taskPriorityQueue.poll();
- equalTasks.add(task);
- // find chunks of the same series in other files
- while (!taskPriorityQueue.isEmpty()) {
- if (taskPriorityQueue.peek().compareTo(task) == 0) {
- equalTasks.add(taskPriorityQueue.poll());
- } else {
- break;
- }
- }
+ TsFileData tsFileData = task.removeNext();
+ statistic.fetchDataTime += System.nanoTime() - startTime;
+ tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
- for (SplitTask equalTask : equalTasks) {
- TsFileData tsFileData = equalTask.removeNext();
- tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
- consumer.apply(tsFileData);
- if (equalTask.hasNext()) {
- taskPriorityQueue.add(equalTask);
+ startTime = System.nanoTime();
+ consumer.apply(tsFileData);
+ statistic.consumeTime += System.nanoTime() - startTime;
+
+
+ startTime = System.nanoTime();
+ if (task.hasNext()) {
+ taskPriorityQueue.add(task);
+ } else {
+ // when a file is exhausted, add the next non-empty file
+ for (; i < tsFiles.size(); i++) {
+ SplitTask splitTask = new SplitTask(tsFiles.get(i), asyncExecutor,
i);
+ logger.info("Start to split {}", tsFiles.get(i));
+ if (splitTask.hasNext()) {
+ taskPriorityQueue.add(splitTask);
+ i++;
+ break;
+ }
}
}
- equalTasks.clear();
+ statistic.enqueueTime += System.nanoTime() - startTime;
}
}
public void close() throws IOException {
+ logger.info("Init/FetchData/Consume/Enqueue Time: {}/{}/{}/{}ms"
+ , statistic.initTime / 1_000_000L
+ , statistic.fetchDataTime / 1_000_000L
+ , statistic.consumeTime / 1_000_000L
+ , statistic.enqueueTime / 1_000_000L);
for (SplitTask task : taskPriorityQueue) {
task.close();
}
taskPriorityQueue.clear();
+ logger.info("Splitter closed.");
+ }
+
+ private class Statistic {
+ private long initTime;
+ private long fetchDataTime;
+ private long consumeTime;
+ private long enqueueTime;
}
private class SplitTask implements Comparable<SplitTask> {
private final File tsFile;
+ private final int fileId;
private TsFileSequenceReader reader;
private final TreeMap<Long, List<Deletion>> offset2Deletions;
@@ -154,8 +188,9 @@ public class MergedTsFileSplitter {
private ExecutorService asyncExecutor;
private Future<Void> asyncTask;
- public SplitTask(File tsFile, ExecutorService asyncExecutor) throws
IOException {
+ public SplitTask(File tsFile, ExecutorService asyncExecutor, int fileId)
throws IOException {
this.tsFile = tsFile;
+ this.fileId = fileId;
this.asyncExecutor = asyncExecutor;
offset2Deletions = new TreeMap<>();
init();
@@ -182,7 +217,11 @@ public class MergedTsFileSplitter {
Comparator<ChunkData> chunkDataComparator =
Comparator.comparing(ChunkData::getDevice, String::compareTo)
.thenComparing(ChunkData::firstMeasurement, String::compareTo);
- return chunkDataComparator.compare(thisChunk, thatChunk);
+ int chunkCompare = chunkDataComparator.compare(thisChunk, thatChunk);
+ if (chunkCompare != 0) {
+ return chunkCompare;
+ }
+ return Integer.compare(this.fileId, o.fileId);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -225,7 +264,7 @@ public class MergedTsFileSplitter {
while (reader != null && !Thread.interrupted()) {
computeNext();
}
- logger.info("{} end splitting", tsFile);
+ logger.info("{} ends splitting", tsFile);
}
public boolean hasNext() throws IOException {
@@ -329,7 +368,6 @@ public class MergedTsFileSplitter {
}
ChunkHeader header = reader.readChunkHeader(marker);
- String measurementId = header.getMeasurementID();
if (header.getDataSize() == 0) {
throw new TsFileRuntimeException(
String.format(
@@ -539,7 +577,8 @@ public class MergedTsFileSplitter {
}
@Override
- public void writeToFileWriter(TsFileIOWriter writer) throws IOException
{}
+ public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
+ }
@Override
public boolean isModification() {
@@ -547,7 +586,8 @@ public class MergedTsFileSplitter {
}
@Override
- public void serialize(DataOutputStream stream) throws IOException {}
+ public void serialize(DataOutputStream stream) throws IOException {
+ }
@Override
public int getSplitId() {
@@ -555,7 +595,8 @@ public class MergedTsFileSplitter {
}
@Override
- public void setSplitId(int sid) {}
+ public void setSplitId(int sid) {
+ }
}
private void getAllModification(Map<Long, List<Deletion>>
offset2Deletions) throws IOException {
@@ -703,19 +744,6 @@ public class MergedTsFileSplitter {
}
}
- /**
- * handle empty page in aligned chunk, if uncompressedSize and
compressedSize are both 0, and
- * the statistics is null, then the page is empty.
- *
- * @param pageHeader page header
- * @return true if the page is empty
- */
- private boolean isEmptyPage(PageHeader pageHeader) {
- return pageHeader.getUncompressedSize() == 0
- && pageHeader.getCompressedSize() == 0
- && pageHeader.getStatistics() == null;
- }
-
private TsPrimitiveType[] decodeValuePage(
TsFileSequenceReader reader,
ChunkHeader chunkHeader,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
index b59b84a0d4b..bf1c79172b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.execution.load;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -101,8 +100,7 @@ public class TsFileDataManager {
// start to dispatch from the biggest TsFilePieceNode
List<TRegionReplicaSet> sortedReplicaSets =
replicaSet2Piece.keySet().stream()
- .sorted(
- Comparator.comparingLong(o ->
replicaSet2Piece.get(o).getDataSize()).reversed())
+ .sorted(Comparator.comparingLong(o ->
replicaSet2Piece.get(o).getDataSize()).reversed())
.collect(Collectors.toList());
for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
@@ -126,7 +124,6 @@ public class TsFileDataManager {
return true;
}
-
protected void routeChunkData() {
if (nonDirectionalChunkData.isEmpty()) {
return;
@@ -161,7 +158,8 @@ public class TsFileDataManager {
routeChunkData();
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
- if (entry.getValue().getDataSize() > 0 &&
!dispatchFunction.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
+ if (entry.getValue().getDataSize() > 0
+ && !dispatchFunction.dispatchOnePieceNode(entry.getValue(),
entry.getKey())) {
logger.warn("Dispatch piece node {} of TsFile {} error.",
entry.getValue(), targetFile);
return false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index 6ef1b578325..2f09a003106 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -19,20 +19,10 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import static
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -49,7 +39,6 @@ import
org.apache.iotdb.db.queryengine.execution.load.locseq.ThroughputBasedLoca
import
org.apache.iotdb.db.queryengine.execution.load.nodesplit.ClusteringMeasurementSplitter;
import
org.apache.iotdb.db.queryengine.execution.load.nodesplit.OrderedMeasurementSplitter;
import
org.apache.iotdb.db.queryengine.execution.load.nodesplit.PieceNodeSplitter;
-import
org.apache.iotdb.db.queryengine.execution.load.nodesplit.PieceNodeSplitter.SingletonSplitter;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
@@ -58,15 +47,36 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
+
public class TsFileSplitSender {
private static final int MAX_RETRY = 5;
private static final long RETRY_INTERVAL_MS = 6_000L;
+ private static final int MAX_PENDING_PIECE_NODE = 5;
private static final Logger logger =
LoggerFactory.getLogger(TsFileSplitSender.class);
private LoadTsFileNode loadTsFileNode;
@@ -83,8 +93,13 @@ public class TsFileSplitSender {
new ConcurrentHashMap<>();
private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
private long maxSplitSize;
- private PieceNodeSplitter pieceNodeSplitter = new
ClusteringMeasurementSplitter(10, 10);
-// private PieceNodeSplitter pieceNodeSplitter = new
OrderedMeasurementSplitter();
+ private PieceNodeSplitter pieceNodeSplitter = new
ClusteringMeasurementSplitter(1.0, 10);
+// private PieceNodeSplitter pieceNodeSplitter = new
OrderedMeasurementSplitter();
+ private CompressionType compressionType = CompressionType.LZ4;
+ private Statistic statistic = new Statistic();
+ private ExecutorService splitNodeService;
+ private Queue<Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet>>
splitFutures;
+ private int maxConcurrentFileNum;
public TsFileSplitSender(
LoadTsFileNode loadTsFileNode,
@@ -92,16 +107,21 @@ public class TsFileSplitSender {
long targetPartitionInterval,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager,
boolean isGeneratedByPipe,
- long maxSplitSize) {
+ long maxSplitSize,
+ int maxConcurrentFileNum) {
this.loadTsFileNode = loadTsFileNode;
this.targetPartitionFetcher = targetPartitionFetcher;
this.targetPartitionInterval = targetPartitionInterval;
this.internalServiceClientManager = internalServiceClientManager;
this.isGeneratedByPipe = isGeneratedByPipe;
this.maxSplitSize = maxSplitSize;
+ this.splitNodeService =
IoTDBThreadPoolFactory.newCachedThreadPool("SplitLoadTsFilePieceNode");
+ this.splitFutures = new ArrayDeque<>(MAX_PENDING_PIECE_NODE);
+ this.maxConcurrentFileNum = maxConcurrentFileNum;
}
public void start() throws IOException {
+ statistic.taskStartTime = System.currentTimeMillis();
// skip files without data
loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
uuid = UUID.randomUUID().toString();
@@ -113,10 +133,13 @@ public class TsFileSplitSender {
} else {
logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
}
+ statistic.taskEndTime = System.currentTimeMillis();
locationStatistics.logLocationStatistics();
+ statistic.logStatistic();
}
private boolean firstPhase(LoadTsFileNode node) throws IOException {
+ long start = System.currentTimeMillis();
TsFileDataManager tsFileDataManager =
new DeviceBatchTsFileDataManager(
this::dispatchOnePieceNode,
@@ -134,16 +157,22 @@ public class TsFileSplitSender {
new SynchronousQueue<>(),
new IoTThreadFactory("MergedTsFileSplitter"),
"MergedTsFileSplitter");
- MergedTsFileSplitter splitter = new MergedTsFileSplitter(
- node.getResources().stream()
- .map(TsFileResource::getTsFile)
- .collect(Collectors.toList()),
- tsFileDataManager::addOrSendTsFileData,
- executorService,
- targetPartitionInterval);
+ MergedTsFileSplitter splitter =
+ new MergedTsFileSplitter(
+ node.getResources().stream()
+ .map(TsFileResource::getTsFile)
+ .collect(Collectors.toList()),
+ tsFileDataManager::addOrSendTsFileData,
+ executorService,
+ targetPartitionInterval,
+ maxConcurrentFileNum);
splitter.splitTsFileByDataPartition();
splitter.close();
- return tsFileDataManager.sendAllTsFileData() && phaseOneFailures.isEmpty();
+ logger.info("Split ends after {}ms", System.currentTimeMillis() - start);
+ boolean success = tsFileDataManager.sendAllTsFileData() &&
processRemainingPieceNodes()
+ && phaseOneFailures.isEmpty();
+ logger.info("Cleanup ends after {}ms", System.currentTimeMillis() - start);
+ return success;
}
private boolean secondPhase(boolean isFirstPhaseSuccess) {
@@ -199,81 +228,237 @@ public class TsFileSplitSender {
}
public LocationSequencer createLocationSequencer(TRegionReplicaSet
replicaSet) {
-// return new FixedLocationSequencer(replicaSet);
-// return new RandomLocationSequencer(replicaSet);
+ // return new FixedLocationSequencer(replicaSet);
+ // return new RandomLocationSequencer(replicaSet);
return new ThroughputBasedLocationSequencer(replicaSet,
locationStatistics);
}
- public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode,
TRegionReplicaSet replicaSet) {
- allReplicaSets.add(replicaSet);
-
- long start = System.currentTimeMillis();
- List<LoadTsFilePieceNode> subNodes = pieceNodeSplitter.split(pieceNode);
- logger.info("{} splits are generated after {}ms", subNodes.size(),
System.currentTimeMillis() - start);
-
- List<Boolean> subNodeResults = subNodes.stream().parallel().map(node -> {
- long startTime = 0;
- TTsFilePieceReq loadTsFileReq =
- new TTsFilePieceReq(node.serializeToByteBuffer(), uuid,
replicaSet.getRegionId());
- loadTsFileReq.isRelay = true;
- LocationSequencer locationSequencer =
createLocationSequencer(replicaSet);
-
- boolean loadSucceed = false;
- Exception lastConnectionError = null;
- TDataNodeLocation currLocation = null;
- for (TDataNodeLocation location : locationSequencer) {
- if (location.getDataNodeId() == 0 && logger.isDebugEnabled()) {
- locationStatistics.logLocationStatistics();
- logger.info("Chose location {}", location.getDataNodeId());
- }
- currLocation = location;
- startTime = System.nanoTime();
- for (int i = 0; i < MAX_RETRY; i++) {
- try (SyncDataNodeInternalServiceClient client =
-
internalServiceClientManager.borrowClient(currLocation.internalEndPoint)) {
- TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
- logger.debug("Response from {}: {}", location.getDataNodeId(),
loadResp);
- if (!loadResp.isAccepted()) {
- logger.warn(loadResp.message);
- phaseOneFailures.put(
- new Pair<>(node, replicaSet),
- new FragmentInstanceDispatchException(loadResp.status));
- return false;
- }
- loadSucceed = true;
- break;
- } catch (ClientManagerException | TException e) {
- lastConnectionError = e;
- }
+ private ByteBuffer compressBuffer(ByteBuffer buffer) throws IOException {
+ statistic.rawSize.addAndGet(buffer.remaining());
+ if (compressionType.equals(CompressionType.UNCOMPRESSED)) {
+ statistic.compressedSize.addAndGet(buffer.remaining());
+ return buffer;
+ }
+ ICompressor compressor = ICompressor.getCompressor(compressionType);
+ int maxBytesForCompression =
compressor.getMaxBytesForCompression(buffer.remaining()) + 1;
+ ByteBuffer compressed = ByteBuffer.allocate(maxBytesForCompression);
+ int compressLength = compressor.compress(buffer.array(),
+ buffer.arrayOffset() + buffer.position(), buffer.remaining(),
compressed.array());
+ compressed.limit(compressLength);
+ statistic.compressedSize.addAndGet(compressLength);
+ return compressed;
+ }
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- return false;
- }
- }
+ private Future<List<LoadTsFilePieceNode>>
submitSplitPieceNode(LoadTsFilePieceNode pieceNode) {
+ return splitNodeService.submit(() -> pieceNodeSplitter.split(pieceNode));
+ }
- if (loadSucceed) {
- break;
- }
+ private boolean processRemainingPieceNodes() {
+ List<LoadTsFilePieceNode> subNodes;
+ for (Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair :
splitFutures) {
+ try {
+ subNodes = pair.left.get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("Unexpected error during splitting node", e);
+ return false;
}
-
- if (!loadSucceed) {
- String warning = NODE_CONNECTION_ERROR;
- logger.warn(warning, currLocation, lastConnectionError);
- TSStatus status = new TSStatus();
- status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage(warning + currLocation);
- phaseOneFailures.put(
- new Pair<>(node, replicaSet), new
FragmentInstanceDispatchException(status));
+ if (!dispatchPieceNodes(subNodes, pair.right)) {
return false;
}
- long timeConsumption = System.nanoTime() - startTime;
- logger.debug("Time consumption: {}", timeConsumption);
- locationStatistics.updateThroughput(currLocation, node.getDataSize(),
timeConsumption);
- return true;
- }).collect(Collectors.toList());
+ }
+ return true;
+ }
+
+ private boolean dispatchPieceNodes(List<LoadTsFilePieceNode> subNodes,
+ TRegionReplicaSet replicaSet) {
+ AtomicLong minDispatchTime = new AtomicLong(Long.MAX_VALUE);
+ AtomicLong maxDispatchTime = new AtomicLong(Long.MIN_VALUE);
+ AtomicLong sumDispatchTime = new AtomicLong();
+ AtomicLong sumCompressingTime = new AtomicLong();
+
+ long start = System.nanoTime();
+ List<Boolean> subNodeResults =
+ subNodes.stream()
+ .parallel()
+ .map(
+ node -> {
+ ByteBuffer buffer;
+ long startTime = System.nanoTime();
+ int uncompressedLength;
+ try {
+ buffer = node.serializeToByteBuffer();
+ uncompressedLength = buffer.remaining();
+ buffer = compressBuffer(buffer);
+ } catch (IOException e) {
+ phaseOneFailures.put(new Pair<>(node, replicaSet), e);
+ return false;
+ }
+ long compressingTime = System.nanoTime() - startTime;
+ sumCompressingTime.addAndGet(compressingTime);
+ statistic.compressingTime.addAndGet(compressingTime);
+
+ TTsFilePieceReq loadTsFileReq =
+ new TTsFilePieceReq(buffer, uuid,
replicaSet.getRegionId());
+ loadTsFileReq.isRelay = true;
+
loadTsFileReq.setCompressionType(compressionType.serialize());
+ loadTsFileReq.setUncompressedLength(uncompressedLength);
+ LocationSequencer locationSequencer =
createLocationSequencer(replicaSet);
+
+ boolean loadSucceed = false;
+ Exception lastConnectionError = null;
+ TDataNodeLocation currLocation = null;
+ for (TDataNodeLocation location : locationSequencer) {
+ if (location.getDataNodeId() == 0 &&
logger.isDebugEnabled()) {
+ locationStatistics.logLocationStatistics();
+ logger.info("Chose location {}",
location.getDataNodeId());
+ }
+ currLocation = location;
+ startTime = System.nanoTime();
+ for (int i = 0; i < MAX_RETRY; i++) {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(
+ currLocation.internalEndPoint)) {
+ TLoadResp loadResp =
client.sendTsFilePieceNode(loadTsFileReq);
+ logger.debug("Response from {}: {}",
location.getDataNodeId(), loadResp);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ phaseOneFailures.put(
+ new Pair<>(node, replicaSet),
+ new
FragmentInstanceDispatchException(loadResp.status));
+ return false;
+ }
+ loadSucceed = true;
+ break;
+ } catch (ClientManagerException | TException e) {
+ lastConnectionError = e;
+ }
+
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ if (loadSucceed) {
+ break;
+ }
+ }
+
+ if (!loadSucceed) {
+ String warning = NODE_CONNECTION_ERROR;
+ logger.warn(warning, currLocation, lastConnectionError);
+ TSStatus status = new TSStatus();
+
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage(warning + currLocation);
+ phaseOneFailures.put(
+ new Pair<>(node, replicaSet),
+ new FragmentInstanceDispatchException(status));
+ return false;
+ }
+ long timeConsumption = System.nanoTime() - startTime;
+ logger.debug("Time consumption: {}", timeConsumption);
+ locationStatistics.updateThroughput(
+ currLocation, node.getDataSize(), timeConsumption);
+
+ synchronized (maxDispatchTime) {
+ if (maxDispatchTime.get() < timeConsumption) {
+ maxDispatchTime.set(timeConsumption);
+ }
+ }
+ synchronized (minDispatchTime) {
+ if (minDispatchTime.get() > timeConsumption) {
+ minDispatchTime.set(timeConsumption);
+ }
+ }
+ sumDispatchTime.addAndGet(timeConsumption);
+ return true;
+ })
+ .collect(Collectors.toList());
+ long elapsedTime = System.nanoTime() - start;
+ statistic.dispatchNodesTime.addAndGet(elapsedTime);
+ logger.debug(
+ "Dispatched one node after {}ms, min/maxDispatching time: {}/{}ns,
avg: {}ns, sum: {}ns, compressing: {}ns",
+ elapsedTime / 1_000_000L, minDispatchTime.get(),
+ maxDispatchTime.get(), sumDispatchTime.get() / subNodes.size(),
sumDispatchTime.get(),
+ sumCompressingTime.get());
return !subNodeResults.contains(false);
}
+
+ public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode,
TRegionReplicaSet replicaSet) {
+ long allStart = System.nanoTime();
+ allReplicaSets.add(replicaSet);
+ if (false) {
+ long start = System.nanoTime();
+ List<LoadTsFilePieceNode> split = pieceNodeSplitter.split(pieceNode);
+ long elapsedTime = System.nanoTime() - start;
+ statistic.splitTime.addAndGet(elapsedTime);
+ statistic.pieceNodeNum.incrementAndGet();
+ logger.debug(
+ "{} splits are generated after {}ms", split.size(),
+ elapsedTime / 1_000_000L);
+ boolean success = dispatchPieceNodes(split, replicaSet);
+ statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ return success;
+ }
+
+ List<LoadTsFilePieceNode> subNodes;
+ if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
+ splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode),
replicaSet));
+ statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ return true;
+ } else {
+ long start = System.nanoTime();
+ Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair =
splitFutures.poll();
+ try {
+ subNodes = pair.left.get();
+ long elapsedTime = System.nanoTime() - start;
+ statistic.splitTime.addAndGet(elapsedTime);
+ statistic.pieceNodeNum.incrementAndGet();
+ logger.debug(
+ "{} splits are generated after {}ms", subNodes.size(),
+ elapsedTime / 1_000_000L);
+
+ splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode),
replicaSet));
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("Unexpected error during splitting node", e);
+ return false;
+ }
+ boolean success = dispatchPieceNodes(subNodes, pair.right);
+ statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ return success;
+ }
+ }
+
+ public static class Statistic {
+
+ public long taskStartTime;
+ public long taskEndTime;
+ public AtomicLong rawSize = new AtomicLong();
+ public AtomicLong compressedSize = new AtomicLong();
+ public AtomicLong splitTime = new AtomicLong();
+ public AtomicLong pieceNodeNum = new AtomicLong();
+ public AtomicLong dispatchNodesTime = new AtomicLong();
+ public AtomicLong dispatchNodeTime = new AtomicLong();
+ public AtomicLong compressingTime = new AtomicLong();
+
+ public void logStatistic() {
+ logger.info("Time consumption: {}ms", taskEndTime - taskStartTime);
+ logger.info(
+ "Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {},
dispatchNodeTime: {}",
+ pieceNodeNum.get(),
+ splitTime.get() / 1_000_000L, dispatchNodesTime.get() / 1_000_000L,
+ dispatchNodeTime.get() / 1_000_000L);
+ logger.info(
+ "Transmission size: {}/{} ({}), compressionTime: {}ms",
+ compressedSize.get(),
+ rawSize.get(),
+ compressedSize.get() * 1.0 / rawSize.get(),
+ compressingTime.get() / 1_000_000L);
+ }
+ }
+
+ public Statistic getStatistic() {
+ return statistic;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
index 6bd49396f0e..b53bc75753f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
@@ -19,13 +19,14 @@
package org.apache.iotdb.db.queryengine.execution.load.locseq;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-public class FixedLocationSequencer implements LocationSequencer{
+public class FixedLocationSequencer implements LocationSequencer {
private List<TDataNodeLocation> orderedLocations;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
index cb3d8723baf..ff1442a4678 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
@@ -19,10 +19,6 @@
package org.apache.iotdb.db.queryengine.execution.load.locseq;
-import java.util.Iterator;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-public interface LocationSequencer extends Iterable<TDataNodeLocation> {
-
-
-}
+public interface LocationSequencer extends Iterable<TDataNodeLocation> {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
index 88360b7a305..f4417174263 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
@@ -19,27 +19,27 @@
package org.apache.iotdb.db.queryengine.execution.load.locseq;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
public class LocationStatistics {
private static final Logger logger =
LoggerFactory.getLogger(LocationStatistics.class);
private Map<TDataNodeLocation, Statistic> locationStatisticMap = new
ConcurrentHashMap<>();
public Statistic getStatistic(TDataNodeLocation location) {
- return locationStatisticMap.computeIfAbsent(location,
- l -> new Statistic());
+ return locationStatisticMap.computeIfAbsent(location, l -> new
Statistic());
}
public void updateThroughput(TDataNodeLocation location, double sum, long
cnt) {
- Statistic statistic = locationStatisticMap.computeIfAbsent(location,
- l -> new Statistic());
+ Statistic statistic = locationStatisticMap.computeIfAbsent(location, l ->
new Statistic());
statistic.updateThroughput(sum, cnt);
statistic.increaseHit();
}
@@ -70,11 +70,14 @@ public class LocationStatistics {
@Override
public String toString() {
- return "{" +
- "throughput=" + getThroughput() +
- ", hit=" + hit +
- ", lastHitTime=" + lastHitTime +
- '}';
+ return "{"
+ + "throughput="
+ + getThroughput()
+ + ", hit="
+ + hit
+ + ", lastHitTime="
+ + lastHitTime
+ + '}';
}
public int getHit() {
@@ -88,9 +91,11 @@ public class LocationStatistics {
public void logLocationStatistics() {
if (logger.isInfoEnabled()) {
- logger.info("Location throughput: {}",
locationStatisticMap.entrySet().stream()
- .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
- .collect(Collectors.toList()));
+ logger.info(
+ "Location throughput: {}",
+ locationStatisticMap.entrySet().stream()
+ .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
+ .collect(Collectors.toList()));
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
index 8af95667865..85233c59de8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
@@ -19,16 +19,18 @@
package org.apache.iotdb.db.queryengine.execution.load.locseq;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
public class RandomLocationSequencer implements LocationSequencer {
private List<TDataNodeLocation> orderedLocations;
+
public RandomLocationSequencer(TRegionReplicaSet replicaSet) {
orderedLocations = new ArrayList<>(replicaSet.dataNodeLocations);
Collections.shuffle(orderedLocations);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
index 48cf1fa2c1b..3045db725c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
@@ -19,39 +19,43 @@
package org.apache.iotdb.db.queryengine.execution.load.locseq;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import
org.apache.iotdb.db.queryengine.execution.load.locseq.LocationStatistics.Statistic;
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
public class ThroughputBasedLocationSequencer implements LocationSequencer {
- private static final Logger logger = LoggerFactory.getLogger(
- ThroughputBasedLocationSequencer.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(ThroughputBasedLocationSequencer.class);
private Random random = new Random();
private long resampleThresholdMS = 10000000;
private List<TDataNodeLocation> orderedLocations;
- public ThroughputBasedLocationSequencer(TRegionReplicaSet replicaSet,
- LocationStatistics locationStatistics) {
- List<Pair<TDataNodeLocation, Double>> locationRanks =
rankLocations(replicaSet,
- locationStatistics);
+ public ThroughputBasedLocationSequencer(
+ TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
+ List<Pair<TDataNodeLocation, Double>> locationRanks =
+ rankLocations(replicaSet, locationStatistics);
orderedLocations = new ArrayList<>(locationRanks.size());
while (!locationRanks.isEmpty()) {
// the chosen location is removed from the list
orderedLocations.add(chooseNextLocation(locationRanks).left);
}
if (logger.isDebugEnabled()) {
- logger.debug("Location orders: {}",
-
orderedLocations.stream().map(TDataNodeLocation::getDataNodeId).collect(
- Collectors.toList()));
+ logger.debug(
+ "Location orders: {}",
+ orderedLocations.stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()));
}
}
@@ -62,8 +66,8 @@ public class ThroughputBasedLocationSequencer implements
LocationSequencer {
* @param replicaSet replica set to be ranked
* @return the nodes and their ranks
*/
- private List<Pair<TDataNodeLocation, Double>>
rankLocations(TRegionReplicaSet replicaSet,
- LocationStatistics locationStatistics) {
+ private List<Pair<TDataNodeLocation, Double>> rankLocations(
+ TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
List<Pair<TDataNodeLocation, Double>> locations =
new ArrayList<>(replicaSet.dataNodeLocations.size());
// retrieve throughput of each node
@@ -84,10 +88,14 @@ public class ThroughputBasedLocationSequencer implements
LocationSequencer {
totalThroughput += throughput;
}
if (logger.isInfoEnabled()) {
- logger.debug("Location throughput: {}",
- locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(),
p.right)).collect(
- Collectors.toList()));
- logger.debug("Total throughput: {}, first rank {}", totalThroughput,
+ logger.debug(
+ "Location throughput: {}",
+ locations.stream()
+ .map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
+ .collect(Collectors.toList()));
+ logger.debug(
+ "Total throughput: {}, first rank {}",
+ totalThroughput,
locations.get(0).right / totalThroughput);
}
@@ -98,9 +106,11 @@ public class ThroughputBasedLocationSequencer implements
LocationSequencer {
location.right = location.right / totalThroughput + locations.get(i -
1).right;
}
if (logger.isInfoEnabled()) {
- logger.debug("Location ranks: {}",
- locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(),
p.right)).collect(
- Collectors.toList()));
+ logger.debug(
+ "Location ranks: {}",
+ locations.stream()
+ .map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
+ .collect(Collectors.toList()));
}
return locations;
}
@@ -115,9 +125,13 @@ public class ThroughputBasedLocationSequencer implements
LocationSequencer {
}
}
if (chosen == 0 && locations.size() == 3 && logger.isDebugEnabled()) {
- logger.debug("Dice {}, chosen {}, ranks {}", dice, chosen,
- locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(),
p.right)).collect(
- Collectors.toList()));
+ logger.debug(
+ "Dice {}, chosen {}, ranks {}",
+ dice,
+ chosen,
+ locations.stream()
+ .map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
+ .collect(Collectors.toList()));
}
Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
// update ranks
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
index b8f8d0eeb30..7c53a6295b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
@@ -1,5 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.OptionalDouble;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import net.ricecode.similarity.DiceCoefficientStrategy;
+import net.ricecode.similarity.JaroStrategy;
+import net.ricecode.similarity.LevenshteinDistanceStrategy;
+import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+
+import net.ricecode.similarity.JaroWinklerStrategy;
+import net.ricecode.similarity.SimilarityStrategy;
+import net.ricecode.similarity.StringSimilarityService;
+import net.ricecode.similarity.StringSimilarityServiceImpl;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -12,58 +57,167 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import net.ricecode.similarity.JaroWinklerStrategy;
-import net.ricecode.similarity.SimilarityStrategy;
-import net.ricecode.similarity.StringSimilarityService;
-import net.ricecode.similarity.StringSimilarityServiceImpl;
-import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClusteringMeasurementSplitter implements PieceNodeSplitter {
- private int numCluster;
+ private static final Logger logger =
LoggerFactory.getLogger(ClusteringMeasurementSplitter.class);
+ private double groupFactor;
private int maxIteration;
- private int dataSampleLength = 128;
-
- public ClusteringMeasurementSplitter(int numCluster, int maxIteration) {
- this.numCluster = numCluster;
+ private int dataSampleLength = 32;
+ private double stdErrThreshold = 1;
+ private long splitStartTime;
+ private long splitTimeBudget = 5;
+ private static final int MAX_CLUSTER_NUM = 500;
+
+ public ClusteringMeasurementSplitter(double groupFactor, int maxIteration) {
+ this.groupFactor = groupFactor;
this.maxIteration = maxIteration;
}
@Override
public List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode) {
+ splitStartTime = System.currentTimeMillis();
if (pieceNode.isHasModification()) {
- // the order of modifications should be preserved, so with modifications
clustering cannot be used
+ // the order of modifications should be preserved, so with modifications
clustering cannot be
+ // used
return new OrderedMeasurementSplitter().split(pieceNode);
}
// split by measurement first
- Map<String, LoadTsFilePieceNode> measurementPieceNodeMap = new HashMap<>();
+ Map<String, LoadTsFilePieceNode> measurementPieceNodeMap = new HashMap<>(
+ pieceNode.getAllTsFileData().size());
for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
ChunkData chunkData = (ChunkData) tsFileData;
String currMeasurement = chunkData.firstMeasurement();
- LoadTsFilePieceNode pieceNodeSplit =
measurementPieceNodeMap.computeIfAbsent(
- currMeasurement,
- m -> new LoadTsFilePieceNode(pieceNode.getPlanNodeId(),
pieceNode.getTsFile()));
+ LoadTsFilePieceNode pieceNodeSplit =
+ measurementPieceNodeMap.computeIfAbsent(
+ currMeasurement,
+ m -> new LoadTsFilePieceNode(pieceNode.getPlanNodeId(),
pieceNode.getTsFile()));
pieceNodeSplit.addTsFileData(chunkData);
}
// use clustering to merge similar measurements
- return clusterPieceNode(measurementPieceNodeMap);
+ List<LoadTsFilePieceNode> loadTsFilePieceNodes =
clusterPieceNode(measurementPieceNodeMap);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Split distribution before refinement: {}",
+ loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
+ .collect(Collectors.toList()));
+ refineClusters(loadTsFilePieceNodes);
+ logger.debug("Split distribution after refinement: {}",
+ loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
+ .collect(Collectors.toList()));
+ }
+ return loadTsFilePieceNodes;
+ }
+
+ private void refineClusters(List<LoadTsFilePieceNode> pieceNodes) {
+ double average =
pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
+ .orElse(0.0);
+ double finalAverage1 = average;
+ double stderr =
Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+ .mapToDouble(s -> (s - finalAverage1) * (s - finalAverage1)).sum() /
pieceNodes.size());
+ logger.debug("{} splits before refinement, average/stderr: {}/{}",
pieceNodes.size(), average,
+ stderr);
+
+ while (stderr > average * stdErrThreshold
+ && System.currentTimeMillis() - splitStartTime < splitTimeBudget) {
+
pieceNodes.sort(Comparator.comparingLong(LoadTsFilePieceNode::getDataSize));
+ LoadTsFilePieceNode smallestPiece = pieceNodes.get(0);
+ LoadTsFilePieceNode largestPiece = pieceNodes.get(pieceNodes.size() - 1);
+ double smallDiff = average - smallestPiece.getDataSize();
+ double largeDiff = largestPiece.getDataSize() - average;
+ if (smallDiff > largeDiff) {
+ mergeSmallPiece(pieceNodes);
+ } else {
+ if (!splitLargePiece(pieceNodes)) {
+ // the largest node may not be splittable (only one series), merge
the smallest instead
+ mergeSmallPiece(pieceNodes);
+ }
+ }
+
+ average =
pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
+ .orElse(0.0);
+ double finalAverage = average;
+ stderr =
Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+ .mapToDouble(s -> (s - finalAverage) * (s - finalAverage)).sum() /
pieceNodes.size());
+ logger.debug("{} splits, average/stderr: {}/{}", pieceNodes.size(),
average, stderr);
+ }
+ logger.debug("{} splits after refinement, average/stderr: {}/{}",
pieceNodes.size(), average,
+ stderr);
+ }
+
+ private void mergeSmallPiece(List<LoadTsFilePieceNode> pieceNodes) {
+ LoadTsFilePieceNode pieceA = pieceNodes.remove(0);
+ LoadTsFilePieceNode pieceB = pieceNodes.remove(0);
+ LoadTsFilePieceNode newPiece = new
LoadTsFilePieceNode(pieceA.getPlanNodeId(),
+ pieceA.getTsFile());
+ pieceA.getAllTsFileData().forEach(newPiece::addTsFileData);
+ pieceB.getAllTsFileData().forEach(newPiece::addTsFileData);
+ pieceNodes.add(newPiece);
+ pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+ .average().orElse(0.0);
+ pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
+ .sum();
+ }
+
+ private boolean splitLargePiece(List<LoadTsFilePieceNode> pieceNodes) {
+ LoadTsFilePieceNode oldPiece = pieceNodes.remove(pieceNodes.size() - 1);
+ LoadTsFilePieceNode newNodeA = new
LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
+ oldPiece.getTsFile());
+ LoadTsFilePieceNode newNodeB = new
LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
+ oldPiece.getTsFile());
+ long sizeTarget = oldPiece.getDataSize() / 2;
+
+ // cannot break a series into two pieces since the chunk order must be
preserved
+ String currMeasurement = null;
+ List<TsFileData> allTsFileData = oldPiece.getAllTsFileData();
+ int i = 0;
+ for (; i < allTsFileData.size(); i++) {
+ TsFileData tsFileData = allTsFileData.get(i);
+ if (tsFileData.isModification()) {
+ // modifications follows previous chunk data
+ newNodeA.addTsFileData(tsFileData);
+ } else {
+ ChunkData chunkData = (ChunkData) tsFileData;
+ if (currMeasurement == null ||
currMeasurement.equals(chunkData.firstMeasurement())) {
+ // the first chunk or chunk of the same series, add it to A
+ currMeasurement = chunkData.firstMeasurement();
+ newNodeA.addTsFileData(tsFileData);
+ sizeTarget -= chunkData.getDataSize();
+ } else {
+ // chunk of the next series
+ if (sizeTarget < 0) {
+ // splitA is full, break to fill splitB
+ break;
+ } else {
+ // splitA is not full, also add this series to A
+ currMeasurement = chunkData.firstMeasurement();
+ newNodeA.addTsFileData(tsFileData);
+ sizeTarget -= chunkData.getDataSize();
+ }
+ }
+ }
+ }
+ // add remaining series to B
+ for (; i < allTsFileData.size(); i++) {
+ TsFileData tsFileData = allTsFileData.get(i);
+ newNodeB.addTsFileData(tsFileData);
+ }
+
+ pieceNodes.add(newNodeA);
+ if (!newNodeB.getAllTsFileData().isEmpty()) {
+ pieceNodes.add(newNodeB);
+ return true;
+ }
+ return false;
}
private List<LoadTsFilePieceNode> clusterPieceNode(
Map<String, LoadTsFilePieceNode> measurementPieceNodeMap) {
// convert to feature vector
- Map<String, SeriesFeatureVector> measurementVectorMap = new HashMap<>(
- measurementPieceNodeMap.size());
+ Map<String, SeriesFeatureVector> measurementVectorMap =
+ new HashMap<>(measurementPieceNodeMap.size());
for (Entry<String, LoadTsFilePieceNode> entry :
measurementPieceNodeMap.entrySet()) {
measurementVectorMap.put(entry.getKey(),
convertToFeature(entry.getValue()));
}
@@ -74,6 +228,10 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
doubleVectors.put(e.getKey(), e.getValue().numericVector);
}
// clustering
+ int numCluster = Math.min((int) (doubleVectors.size() / groupFactor),
MAX_CLUSTER_NUM);
+ if (numCluster < 1) {
+ numCluster = 1;
+ }
VectorDistance distance = new EuclideanDistance();
Clustering clustering = new KMeans(numCluster, maxIteration);
List<List<String>> clusterResult = clustering.cluster(doubleVectors,
distance);
@@ -84,8 +242,8 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
continue;
}
LoadTsFilePieceNode pieceNode =
measurementPieceNodeMap.get(cluster.get(0));
- LoadTsFilePieceNode clusterNode = new
LoadTsFilePieceNode(pieceNode.getPlanNodeId(),
- pieceNode.getTsFile());
+ LoadTsFilePieceNode clusterNode =
+ new LoadTsFilePieceNode(pieceNode.getPlanNodeId(),
pieceNode.getTsFile());
for (String measurementId : cluster) {
pieceNode = measurementPieceNodeMap.get(measurementId);
for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
@@ -128,22 +286,36 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
maxNumOfPages = Math.max(maxNumOfPages, vector.numOfPages);
}
- SimilarityStrategy strategy = new JaroWinklerStrategy();
+ SimilarityStrategy strategy = new LevenshteinDistanceStrategy();
StringSimilarityService service = new
StringSimilarityServiceImpl(strategy);
- for (SeriesFeatureVector vector : vectors) {
- vector.numericVector[0] = service.score(firstMeasurementId,
vector.measurementId);
- vector.numericVector[1] = (vector.dataSize - minDataSize) * 1.0 /
(maxDataSize - minDataSize);
+ int finalMinDataSize = minDataSize;
+ String finalFirstMeasurementId = firstMeasurementId;
+ int finalMaxDataSize = maxDataSize;
+ int finalMinDataType = minDataType;
+ int finalMaxDataType = maxDataType;
+ int finalMinCompressionType = minCompressionType;
+ int finalMaxCompressionType = maxCompressionType;
+ int finalMinEncodingType = minEncodingType;
+ int finalMaxEncodingType = maxEncodingType;
+ int finalMinNumOfPages = minNumOfPages;
+ int finalMaxNumOfPages = maxNumOfPages;
+ String finalFirstDataSample = firstDataSample;
+ vectors.stream().parallel().forEach(vector -> {
+ vector.numericVector[0] = service.score(finalFirstMeasurementId,
vector.measurementId);
+ vector.numericVector[1] = (vector.dataSize - finalMinDataSize) * 1.0 /
(finalMaxDataSize - finalMinDataSize);
vector.numericVector[2] =
- (vector.dataType.ordinal() - minDataType) * 1.0 / (maxDataType -
minDataType);
+ (vector.dataType.ordinal() - finalMinDataType) * 1.0 /
(finalMaxDataType - finalMinDataType);
vector.numericVector[3] =
- (vector.compressionType.ordinal() - minCompressionType) * 1.0 /
(maxCompressionType
- - minCompressionType);
+ (vector.compressionType.ordinal() - finalMinCompressionType)
+ * 1.0
+ / (finalMaxCompressionType - finalMinCompressionType);
vector.numericVector[4] =
- (vector.encodingType.ordinal() - minEncodingType) * 1.0 /
(maxEncodingType
- - minEncodingType);
+ (vector.encodingType.ordinal() - finalMinEncodingType)
+ * 1.0
+ / (finalMaxEncodingType - finalMinEncodingType);
vector.numericVector[5] =
- (vector.numOfPages - minNumOfPages) * 1.0 / (maxNumOfPages -
minNumOfPages);
- vector.numericVector[6] = service.score(firstDataSample,
vector.dataSample);
+ (vector.numOfPages - finalMinNumOfPages) * 1.0 / (finalMaxNumOfPages
- finalMinNumOfPages);
+ vector.numericVector[6] = service.score(finalFirstDataSample,
vector.dataSample);
double[] numericVector = vector.numericVector;
for (int i = 0; i < numericVector.length; i++) {
if (Double.isNaN(numericVector[i])) {
@@ -152,7 +324,7 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
numericVector[i] = 1.0;
}
}
- }
+ });
}
private SeriesFeatureVector convertToFeature(LoadTsFilePieceNode pieceNode) {
@@ -222,8 +394,11 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
vector.compressionType = chunkHeader.getCompressionType();
vector.encodingType = chunkHeader.getEncodingType();
vector.numOfPages = chunkHeader.getNumOfPages();
- vector.dataSample = new String(chunkBuffer.array(),
- chunkBuffer.arrayOffset() + chunkBuffer.position(),
chunkBuffer.remaining());
+ vector.dataSample =
+ new String(
+ chunkBuffer.array(),
+ chunkBuffer.arrayOffset() + chunkBuffer.position(),
+ chunkBuffer.remaining());
return vector;
}
@@ -252,9 +427,9 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
public double calculate(double[] v1, double[] v2) {
double sum = 0;
for (int i = 0; i < v1.length; i++) {
- sum += Math.pow(v1[i] - v2[i], 2);
+ sum += (v1[i] - v2[i]) * (v1[i] - v2[i]);
}
- return Math.sqrt(sum);
+ return Math.sqrt(sum) + 1;
}
}
@@ -263,11 +438,12 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
List<List<String>> cluster(Map<String, double[]> tagVectorMap,
VectorDistance distance);
}
- public static class KMeans implements Clustering {
+ public class KMeans implements Clustering {
private int k;
private int maxIteration;
private double[][] centroids;
+ private AtomicInteger[] centroidCounters;
private Random random;
private Map<Entry<String, double[]>, Integer> recordCentroidMapping;
private int vecLength = 0;
@@ -276,10 +452,20 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
this.k = k;
this.maxIteration = maxIteration;
this.centroids = new double[k][];
+ this.centroidCounters = new AtomicInteger[k];
+ for (int i = 0; i < centroidCounters.length; i++) {
+ centroidCounters[i] = new AtomicInteger();
+ }
this.random = new Random();
this.recordCentroidMapping = new ConcurrentHashMap<>();
}
+ private void clearCentroidCounter() {
+ for (AtomicInteger centroidCounter : centroidCounters) {
+ centroidCounter.set(0);
+ }
+ }
+
@Override
public List<List<String>> cluster(Map<String, double[]> tagVectorMap,
VectorDistance distance) {
recordCentroidMapping.clear();
@@ -299,70 +485,93 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
break;
}
newCentroid();
+ clearCentroidCounter();
+ if (System.currentTimeMillis() - splitStartTime > splitTimeBudget) {
+ break;
+ }
}
- Map<Integer, List<Entry<String, double[]>>> centroidRecordMap =
collectCentroidRecordMapping();
+ Map<Integer, List<Entry<String, double[]>>> centroidRecordMap =
+ collectCentroidRecordMapping();
return centroidRecordMap.values().stream()
- .map(l ->
l.stream().map(Entry::getKey).collect(Collectors.toList())).collect(
- Collectors.toList());
+ .map(l -> l.stream().map(Entry::getKey).collect(Collectors.toList()))
+ .collect(Collectors.toList());
}
private Map<Integer, List<Entry<String, double[]>>>
collectCentroidRecordMapping() {
Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping = new
ConcurrentHashMap<>();
- recordCentroidMapping.entrySet().stream().parallel()
- .forEach(e -> centroidRecordMapping.compute(e.getValue(), (key,
oldV) -> {
- if (oldV == null) {
- oldV = new ArrayList<>();
- }
- oldV.add(e.getKey());
- return oldV;
- }));
+ recordCentroidMapping.entrySet().stream()
+ .parallel()
+ .forEach(
+ e ->
+ centroidRecordMapping.compute(
+ e.getValue(),
+ (key, oldV) -> {
+ if (oldV == null) {
+ oldV = new ArrayList<>();
+ }
+ oldV.add(e.getKey());
+ return oldV;
+ }));
return centroidRecordMapping;
}
private void newCentroid() {
- Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping =
collectCentroidRecordMapping();
- centroidRecordMapping.entrySet().stream().parallel().forEach(e -> {
- Integer centroidId = e.getKey();
- List<Entry<String, double[]>> records = e.getValue();
- int recordNum = records.size();
- double[] sumVec = new double[vecLength];
- for (Entry<String, double[]> record : records) {
- for (int i = 0; i < sumVec.length; i++) {
- sumVec[i] += record.getValue()[i];
- }
- }
- for (int i = 0; i < sumVec.length; i++) {
- sumVec[i] = sumVec[i] / recordNum;
- }
- centroids[centroidId] = sumVec;
- });
+ Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping =
+ collectCentroidRecordMapping();
+ centroidRecordMapping.entrySet().stream()
+ .parallel()
+ .forEach(
+ e -> {
+ Integer centroidId = e.getKey();
+ List<Entry<String, double[]>> records = e.getValue();
+ int recordNum = records.size();
+ double[] sumVec = new double[vecLength];
+ for (Entry<String, double[]> record : records) {
+ for (int i = 0; i < sumVec.length; i++) {
+ sumVec[i] += record.getValue()[i];
+ }
+ }
+ for (int i = 0; i < sumVec.length; i++) {
+ sumVec[i] = sumVec[i] / recordNum;
+ }
+ centroids[centroidId] = sumVec;
+ });
}
private boolean assignCentroid(Map<String, double[]> tagVectorMap,
VectorDistance distance) {
AtomicBoolean centroidUpdated = new AtomicBoolean(false);
- tagVectorMap.entrySet().stream().parallel().forEach(e -> {
- double[] vector = e.getValue();
- double minDist = Double.MAX_VALUE;
- int nearestCentroid = 0;
- for (int i = 0; i < centroids.length; i++) {
- double dist = distance.calculate(vector, centroids[i]);
- if (dist < minDist) {
- minDist = dist;
- nearestCentroid = i;
- }
- }
-
- int finalNearestCentroid = nearestCentroid;
- recordCentroidMapping.compute(e, (t, oc) -> {
- if (oc == null || oc != finalNearestCentroid) {
- centroidUpdated.set(true);
- return finalNearestCentroid;
- } else {
- return oc;
- }
- });
- });
+ tagVectorMap.entrySet().stream()
+ .parallel()
+ .forEach(
+ e -> {
+ double[] vector = e.getValue();
+ double minDist = Double.MAX_VALUE;
+ int nearestCentroid = 0;
+ for (int i = 0; i < centroids.length; i++) {
+ double dist =
+ distance.calculate(vector, centroids[i]) * (1
+ + centroidCounters[i].get() * 0.1);
+ if (dist < minDist) {
+ minDist = dist;
+ nearestCentroid = i;
+ }
+ }
+
+ centroidCounters[nearestCentroid].incrementAndGet();
+ int finalNearestCentroid = nearestCentroid;
+ recordCentroidMapping.put(e, finalNearestCentroid);
+ recordCentroidMapping.compute(
+ e,
+ (t, oc) -> {
+ if (oc == null || oc != finalNearestCentroid) {
+ centroidUpdated.set(true);
+ return finalNearestCentroid;
+ } else {
+ return oc;
+ }
+ });
+ });
return centroidUpdated.get();
}
@@ -372,7 +581,7 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
}
private void pickRandomCentroid(Map<String, double[]> tagVectorMap) {
- List<double[]> recordVectors =
tagVectorMap.values().stream().collect(Collectors.toList());
+ List<double[]> recordVectors = new ArrayList<>(tagVectorMap.values());
Collections.shuffle(recordVectors);
for (int i = 0; i < k; i++) {
centroids[i] = recordVectors.get(i);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
index dc3b2526f91..9081e8f52de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/OrderedMeasurementSplitter.java
@@ -19,23 +19,26 @@
package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
-public class OrderedMeasurementSplitter implements PieceNodeSplitter{
+import java.util.ArrayList;
+import java.util.List;
+
+public class OrderedMeasurementSplitter implements PieceNodeSplitter {
/**
* Split a piece node by series.
+ *
* @return a list of piece nodes, each associated to only one series.
*/
@Override
public List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode) {
List<LoadTsFilePieceNode> result = new ArrayList<>();
String currMeasurement = null;
- LoadTsFilePieceNode currNode = new
LoadTsFilePieceNode(pieceNode.getPlanNodeId(), pieceNode.getTsFile());
+ LoadTsFilePieceNode currNode =
+ new LoadTsFilePieceNode(pieceNode.getPlanNodeId(),
pieceNode.getTsFile());
result.add(currNode);
for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
if (tsFileData.isModification()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
index 6aebc82d7cd..efa45edfea6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/PieceNodeSplitter.java
@@ -19,9 +19,10 @@
package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+
import java.util.Collections;
import java.util.List;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
public interface PieceNodeSplitter {
List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index d10130d9e2c..2eb3500eaa8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -36,7 +36,6 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
import
org.apache.iotdb.db.queryengine.execution.load.DataPartitionBatchFetcher;
import org.apache.iotdb.db.queryengine.execution.load.TsFileDataManager;
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
index dbbbfea41bd..9f7a713b15d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
@@ -19,21 +19,7 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Collectors;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -57,40 +43,63 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.rpc.TSStatusCode;
+
import org.junit.Test;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
public class LoadTsFileSchedulerTest extends TestBase {
protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File,
Set<Integer>>>>>
phaseOneResults = new ConcurrentSkipListMap<>();
// the third key is UUid, the value is command type
- protected Map<TEndPoint, Map<String, Integer>> phaseTwoResults =
- new ConcurrentSkipListMap<>();
+ protected Map<TEndPoint, Map<String, Integer>> phaseTwoResults = new
ConcurrentSkipListMap<>();
@Test
public void test() {
MPPQueryContext context = new MPPQueryContext(QueryId.MOCK_QUERY_ID);
PlanFragmentId fragmentId = new
PlanFragmentId("load_tsfile_scheduler_test", 0);
- SubPlan subPlan = new SubPlan(
- new PlanFragment(fragmentId, null));
+ SubPlan subPlan = new SubPlan(new PlanFragment(fragmentId, null));
List<FragmentInstance> fragmentInstanceList = new ArrayList<>();
for (int i = 0; i < tsFileResources.size(); i++) {
TsFileResource tsFileResource = tsFileResources.get(i);
- LoadSingleTsFileNode singleTsFileNode = new LoadSingleTsFileNode(
- new PlanNodeId("load_tsfile_scheduler_test" + (i + 1)),
tsFileResource, false);
- fragmentInstanceList.add(new FragmentInstance(
- new PlanFragment(fragmentId, singleTsFileNode),
- new FragmentInstanceId(fragmentId, "" + i),
- null, null, 0, null));
+ LoadSingleTsFileNode singleTsFileNode =
+ new LoadSingleTsFileNode(
+ new PlanNodeId("load_tsfile_scheduler_test" + (i + 1)),
tsFileResource, false);
+ fragmentInstanceList.add(
+ new FragmentInstance(
+ new PlanFragment(fragmentId, singleTsFileNode),
+ new FragmentInstanceId(fragmentId, "" + i),
+ null,
+ null,
+ 0,
+ null));
}
- DistributedQueryPlan queryPlan = new DistributedQueryPlan(context,
subPlan, null,
- fragmentInstanceList);
-
- LoadTsFileScheduler scheduler = new LoadTsFileScheduler(queryPlan, context,
- new QueryStateMachine(context.getQueryId(),
-
IoTDBThreadPoolFactory.newCachedThreadPool("LoadTsFileSchedulerTest")),
- internalServiceClientManager, dummyDataPartitionBatchFetcher(), false);
+ DistributedQueryPlan queryPlan =
+ new DistributedQueryPlan(context, subPlan, null, fragmentInstanceList);
+
+ LoadTsFileScheduler scheduler =
+ new LoadTsFileScheduler(
+ queryPlan,
+ context,
+ new QueryStateMachine(
+ context.getQueryId(),
+
IoTDBThreadPoolFactory.newCachedThreadPool("LoadTsFileSchedulerTest")),
+ internalServiceClientManager,
+ dummyDataPartitionBatchFetcher(),
+ false);
long start = System.currentTimeMillis();
scheduler.start();
@@ -103,29 +112,33 @@ public class LoadTsFileSchedulerTest extends TestBase {
public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint) {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
- LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(
- req.body.slice());
+ LoadTsFilePieceNode pieceNode =
+ (LoadTsFilePieceNode) PlanNodeType.deserialize(req.body.slice());
Set<Integer> splitIds =
phaseOneResults
- .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>(
- Comparator.comparingInt(ConsensusGroupId::getId)))
+ .computeIfAbsent(
+ tEndpoint,
+ e -> new
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
.computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
.computeIfAbsent(pieceNode.getTsFile(), f -> new
ConcurrentSkipListSet<>());
-
splitIds.addAll(pieceNode.getAllTsFileData().stream().map(TsFileData::getSplitId).collect(
- Collectors.toList()));
+ splitIds.addAll(
+ pieceNode.getAllTsFileData().stream()
+ .map(TsFileData::getSplitId)
+ .collect(Collectors.toList()));
- return new TLoadResp().setAccepted(true)
+ return new TLoadResp()
+ .setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint) {
phaseTwoResults
- .computeIfAbsent(tEndpoint,
- e -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> req.commandType);
- return new TLoadResp().setAccepted(true)
+ return new TLoadResp()
+ .setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@@ -134,8 +147,8 @@ public class LoadTsFileSchedulerTest extends TestBase {
for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File,
Set<Integer>>>>>
tEndPointMapEntry : phaseOneResults.entrySet()) {
TEndPoint endPoint = tEndPointMapEntry.getKey();
- for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
- consensusGroupIdMapEntry : tEndPointMapEntry.getValue().entrySet()) {
+ for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
consensusGroupIdMapEntry :
+ tEndPointMapEntry.getValue().entrySet()) {
ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
int chunkNum = 0;
int fileNum = 0;
@@ -150,27 +163,25 @@ public class LoadTsFileSchedulerTest extends TestBase {
}
}
System.out.printf(
- "%s - %s - %s tasks - %s files - %s chunks\n", endPoint,
consensusGroupId, taskNum, fileNum, chunkNum);
-// if (consensusGroupId.getId() == 0) {
-// // d1, non-aligned series
-// assertEquals(expectedChunkNum() / 2, chunkNum);
-// } else {
-// // d2, aligned series
-// assertEquals(expectedChunkNum() / 2 / seriesNum, chunkNum);
-// }
+ "%s - %s - %s tasks - %s files - %s chunks\n",
+ endPoint, consensusGroupId, taskNum, fileNum, chunkNum);
+ // if (consensusGroupId.getId() == 0) {
+ // // d1, non-aligned series
+ // assertEquals(expectedChunkNum() / 2, chunkNum);
+ // } else {
+ // // d2, aligned series
+ // assertEquals(expectedChunkNum() / 2 / seriesNum, chunkNum);
+ // }
}
}
System.out.print("Phase two:\n");
- for (Entry<TEndPoint, Map<String, Integer>> tEndPointMapEntry :
- phaseTwoResults.entrySet()) {
+ for (Entry<TEndPoint, Map<String, Integer>> tEndPointMapEntry :
phaseTwoResults.entrySet()) {
TEndPoint endPoint = tEndPointMapEntry.getKey();
- for (Entry<String, Integer> stringMapEntry :
- tEndPointMapEntry.getValue().entrySet()) {
+ for (Entry<String, Integer> stringMapEntry :
tEndPointMapEntry.getValue().entrySet()) {
String uuid = stringMapEntry.getKey();
int command = stringMapEntry.getValue();
- System.out.printf("%s - %s - %s\n", endPoint, uuid,
- LoadCommand.values()[command]);
+ System.out.printf("%s - %s - %s\n", endPoint, uuid,
LoadCommand.values()[command]);
assertEquals(LoadCommand.EXECUTE.ordinal(), command);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
index 1d31c90ccc9..71efb778a47 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import static org.junit.Assert.assertEquals;
-
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.db.utils.TimePartitionUtils;
@@ -33,6 +31,8 @@ import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
+
public class MergedTsFileSplitterTest extends TestBase {
private List<TsFileData> resultSet = new ArrayList<>();
@@ -52,7 +52,8 @@ public class MergedTsFileSplitterTest extends TestBase {
new SynchronousQueue<>(),
new IoTThreadFactory("MergedTsFileSplitter"),
"MergedTsFileSplitter"),
- TimePartitionUtils.getTimePartitionInterval());
+ TimePartitionUtils.getTimePartitionInterval(),
+ fileNum);
try {
splitter.splitTsFileByDataPartition();
for (TsFileData tsFileData : resultSet) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index aaa64b1e978..fab94c1751b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -19,16 +19,10 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
+
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -60,14 +54,19 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -78,6 +77,17 @@ import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
public class TestBase {
private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
@@ -87,16 +97,21 @@ public class TestBase {
public static final String TEST_TSFILE_PATH =
BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) +
PARTIAL_PATH_STRING;
- protected int fileNum = 500;
+ protected int fileNum = 100;
// series number of each file, sn non-aligned series and 1 aligned series
with sn measurements
protected int seriesNum = 1000;
+ protected int deviceNum = 100;
// number of chunks of each series in a file, each series has only one chunk
in a file
- protected double chunkTimeRangeRatio = 0.03;
+ protected double chunkTimeRangeRatio = 0.001;
// the interval between two consecutive points of a series
protected long pointInterval = 50_000;
- protected List<DoubleSequenceGeneratorFactory> sequenceGeneratorFactories =
Arrays.asList(
- new SimpleDoubleSequenceGenerator.Factory(), new
UniformDoubleSequenceGenerator.Factory(),
- new GaussianDoubleSequenceGenerator.Factory());
+ protected List<Pair<String, DoubleSequenceGeneratorFactory>>
measurementSequenceGeneratorPairs = Arrays.asList(
+ new Pair<>("Simple_", new SimpleDoubleSequenceGenerator.Factory()),
+ new Pair<>("UniformA_", new UniformDoubleSequenceGenerator.Factory(1.0)),
+ new Pair<>("GaussianA_", new
GaussianDoubleSequenceGenerator.Factory(1.0, 1.0)),
+ new Pair<>("UniformB_", new
UniformDoubleSequenceGenerator.Factory(10.0)),
+ new Pair<>("GaussianB_", new
GaussianDoubleSequenceGenerator.Factory(15.0, 3.0))
+ );
protected final List<File> files = new ArrayList<>();
protected final List<TsFileResource> tsFileResources = new ArrayList<>();
protected IPartitionFetcher partitionFetcher;
@@ -107,6 +122,8 @@ public class TestBase {
internalServiceClientManager;
// the third key is UUid, the forth key is targetFile
+ private int groupSizeInByte;
+
@Before
public void setup() throws IOException, WriteProcessException {
setupFiles();
@@ -114,6 +131,8 @@ public class TestBase {
partitionFetcher = dummyPartitionFetcher();
setupPartitionTable();
setupClientManager();
+ groupSizeInByte =
TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte((int) (GB));
}
@After
@@ -121,12 +140,14 @@ public class TestBase {
for (File file : files) {
file.delete();
}
+
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(groupSizeInByte);
}
public int expectedChunkNum() {
double totalTimeRange = chunkTimeRangeRatio * fileNum;
int splitChunkNum = 0;
- // if the boundary of the ith partition does not overlap a chunk, it
introduces an additional split
+ // if the boundary of the ith partition does not overlap a chunk, it
introduces an additional
+ // split
// TODO: due to machine precision, the calculation may have error. Also,
if the data amount is
// too large, there could be more than one chunk for each series in the
file.
for (int i = 0; i <= totalTimeRange; i++) {
@@ -138,13 +159,15 @@ public class TestBase {
}
public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint)
- throws TException {
- return new TLoadResp().setAccepted(true)
+ throws TException, IOException {
+ return new TLoadResp()
+ .setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint) {
- return new TLoadResp().setAccepted(true)
+ return new TLoadResp()
+ .setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@@ -161,11 +184,14 @@ public class TestBase {
public SyncDataNodeInternalServiceClient borrowClient(TEndPoint
node) {
try {
return new SyncDataNodeInternalServiceClient(
- dummyProtocol(),
- new ThriftClientProperty.Builder().build(), node, this) {
+ dummyProtocol(), new ThriftClientProperty.Builder().build(),
node, this) {
@Override
public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req)
throws TException {
- return handleTsFilePieceNode(req, getTEndpoint());
+ try {
+ return handleTsFilePieceNode(req, getTEndpoint());
+ } catch (IOException e) {
+ throw new TException(e);
+ }
}
@Override
@@ -193,8 +219,7 @@ public class TestBase {
}
public void setupPartitionTable() {
- ConsensusGroupId d1GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(),
- 0);
+ ConsensusGroupId d1GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
TRegionReplicaSet d1Replicas =
new TRegionReplicaSet(
d1GroupId.convertToTConsensusGroupId(),
@@ -208,9 +233,11 @@ public class TestBase {
new TDataNodeLocation()
.setDataNodeId(2)
.setInternalEndPoint(new TEndPoint("localhost", 10002))));
- partitionTable.put("d1", d1Replicas);
- groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
+ for (int i = 0; i < deviceNum; i++) {
+ partitionTable.put("d" + i, d1Replicas);
+ }
+ groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
ConsensusGroupId d2GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(), 1);
TRegionReplicaSet d2Replicas =
new TRegionReplicaSet(
@@ -225,7 +252,7 @@ public class TestBase {
new TDataNodeLocation()
.setDataNodeId(5)
.setInternalEndPoint(new TEndPoint("localhost", 10005))));
- partitionTable.put("d2", d2Replicas);
+ partitionTable.put("dd1", d2Replicas);
groupId2ReplicaSetMap.put(d2GroupId, d2Replicas);
}
@@ -287,15 +314,28 @@ public class TestBase {
@Override
public List<TRegionReplicaSet> queryDataPartition(
List<Pair<String, TTimePartitionSlot>> slotList) {
- return slotList.stream()
- .map(p -> partitionTable.get(p.left))
- .collect(Collectors.toList());
+ return slotList.stream().map(p ->
partitionTable.get(p.left)).collect(Collectors.toList());
}
};
}
public void setupFiles() {
-
+
measurementSequenceGeneratorPairs.sort(Comparator.comparing(Pair::getLeft));
+ List<Pair<MeasurementSchema, DoubleSequenceGeneratorFactory>>
schemaGeneratorPairs = new ArrayList<>();
+ for (int sn = 0; sn < seriesNum; sn++) {
+ Pair<String, DoubleSequenceGeneratorFactory> measurementGeneratorPair =
measurementSequenceGeneratorPairs.get(
+ sn % measurementSequenceGeneratorPairs.size());
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(measurementGeneratorPair
+ .left + sn,
+ TSDataType.DOUBLE);
+ measurementSchema.setCompressor(CompressionType.LZ4.serialize());
+ measurementSchema.setEncoding(TSEncoding.PLAIN.serialize());
+ schemaGeneratorPairs.add(new Pair<>(measurementSchema,
measurementGeneratorPair.right));
+ }
+ schemaGeneratorPairs.sort(Comparator.comparing(s ->
s.left.getMeasurementId()));
+ List<MeasurementSchema> measurementSchemas =
schemaGeneratorPairs.stream().map(m -> m.left).collect(
+ Collectors.toList());
IntStream.range(0, fileNum)
.parallel()
.forEach(
@@ -309,45 +349,47 @@ public class TestBase {
try (TsFileWriter writer = new TsFileWriter(file)) {
// sn non-aligned series under d1 and 1 aligned series with
sn measurements under
- // d2
- List<MeasurementSchema> measurementSchemas = new
ArrayList<>();
+ // dd2
for (int sn = 0; sn < seriesNum; sn++) {
- MeasurementSchema measurementSchema = new
MeasurementSchema("s" + sn,
- TSDataType.DOUBLE);
- writer.registerTimeseries(
- new Path("d1"), measurementSchema);
- measurementSchemas.add(measurementSchema);
+ for (int dn = 0; dn < deviceNum; dn++) {
+ writer.registerTimeseries(new Path("d"+dn),
schemaGeneratorPairs.get(sn).left);
+ }
}
- writer.registerAlignedTimeseries(new Path("d2"),
measurementSchemas);
+ writer.registerAlignedTimeseries(new Path("dd1"),
measurementSchemas);
// one chunk for each series
long timePartitionInterval =
TimePartitionUtils.getTimePartitionInterval();
long chunkTimeRange = (long) (timePartitionInterval *
chunkTimeRangeRatio);
int chunkPointNum = (int) (chunkTimeRange / pointInterval);
- Tablet tablet = new Tablet("d1", measurementSchemas,
chunkPointNum);
+ Tablet tablet = new Tablet("d0", measurementSchemas,
chunkPointNum);
for (int sn = 0; sn < seriesNum; sn++) {
- DoubleSequenceGenerator sequenceGenerator =
sequenceGeneratorFactories.get(
- sn % sequenceGeneratorFactories.size()).create();
+ DoubleSequenceGenerator sequenceGenerator =
+ schemaGeneratorPairs.get(sn).right
+ .create();
for (int pn = 0; pn < chunkPointNum; pn++) {
if (sn == 0) {
long currTime = chunkTimeRange * i + pointInterval *
pn;
tablet.addTimestamp(pn, currTime);
}
- tablet.addValue("s" + sn, pn, sequenceGenerator.gen(pn));
+
tablet.addValue(schemaGeneratorPairs.get(sn).left.getMeasurementId(), pn,
+ sequenceGenerator.gen(pn));
}
}
tablet.rowSize = chunkPointNum;
- writer.write(tablet);
+ for (int dn = 0; dn < deviceNum; dn++) {
+ tablet.deviceId = "d" + dn;
+ writer.write(tablet);
+ }
tablet.deviceId = "d2";
- //writer.writeAligned(tablet);
+ // writer.writeAligned(tablet);
writer.flushAllChunkGroups();
- tsFileResource.updateStartTime("d1", chunkTimeRange * i);
- tsFileResource.updateStartTime("d2", chunkTimeRange * i);
- tsFileResource.updateEndTime("d1", chunkTimeRange * (i + 1));
- tsFileResource.updateEndTime("d2", chunkTimeRange * (i + 1));
+ for (int dn = 0; dn < deviceNum; dn++) {
+ tsFileResource.updateStartTime("d"+dn, chunkTimeRange * i);
+ tsFileResource.updateEndTime("d"+dn, chunkTimeRange * (i +
1));
+ }
}
tsFileResource.close();
@@ -358,6 +400,7 @@ public class TestBase {
throw new RuntimeException(e);
}
});
+ tsFileResources.sort(Comparator.comparingLong(TsFileResource::getVersion));
}
public static String getTestTsFilePath(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 819daa9d66b..818e3f80d25 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -19,19 +19,7 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -47,12 +35,31 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.apache.thrift.TException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+import static org.junit.Assert.assertEquals;
+
public class TsFileSplitSenderTest extends TestBase {
private static final Logger logger =
LoggerFactory.getLogger(TsFileSplitSenderTest.class);
@@ -69,18 +76,38 @@ public class TsFileSplitSenderTest extends TestBase {
// simulating jvm stall like GC
private long minStuckIntervalMS = 50000;
private long maxStuckIntervalMS = 100000;
- private long stuckDurationMS = 10000;
+ private long stuckDurationMS = 0;
- private long nodeThroughput = 500000;
+ private long nodeThroughput = 10_000;
protected Map<TEndPoint, Pair<Long, Long>> nextStuckTimeMap = new
ConcurrentHashMap<>();
+ private AtomicLong sumHandleTime = new AtomicLong();
+ private AtomicLong decompressTime = new AtomicLong();
+ private AtomicLong deserializeTime = new AtomicLong();
+ private AtomicLong relayTime = new AtomicLong();
+ private AtomicLong maxMemoryUsage = new AtomicLong();
@Test
public void test() throws IOException {
+ Thread thread = new Thread(() -> {
+ while (!Thread.interrupted()) {
+ long preUsage = maxMemoryUsage.get();
+ long newUsage = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
+ if (preUsage < newUsage) {
+ maxMemoryUsage.set(newUsage);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ });
+ thread.start();
+
LoadTsFileNode loadTsFileNode =
new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
- DataPartitionBatchFetcher partitionBatchFetcher =
- dummyDataPartitionBatchFetcher();
+ DataPartitionBatchFetcher partitionBatchFetcher =
dummyDataPartitionBatchFetcher();
TsFileSplitSender splitSender =
new TsFileSplitSender(
loadTsFileNode,
@@ -88,30 +115,44 @@ public class TsFileSplitSenderTest extends TestBase {
TimePartitionUtils.getTimePartitionInterval(),
internalServiceClientManager,
false,
- maxSplitSize);
+ maxSplitSize,
+ 100);
long start = System.currentTimeMillis();
splitSender.start();
long timeConsumption = System.currentTimeMillis() - start;
printPhaseResult();
- System.out.printf("Split ends after %dms", timeConsumption);
+ long transmissionTime = splitSender.getStatistic().compressedSize.get() /
nodeThroughput;
+ System.out.printf("Split ends after %dms + %dms (Transmission) = %dms\n",
timeConsumption,
+ transmissionTime, timeConsumption + transmissionTime);
+ System.out.printf("Handle sum %dns\n", sumHandleTime.get());
+ System.out.printf("Decompress sum %dns\n", decompressTime.get());
+ System.out.printf("Deserialize sum %dns\n", deserializeTime.get());
+ System.out.printf("Relay sum %dns\n", relayTime.get());
+ System.out.printf("Memory usage %dMB\n", maxMemoryUsage.get() / MB);
}
public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint)
- throws TException {
- if ((tEndpoint.getPort() - 10000) % 3 == 0 && random.nextDouble() <
packetLossRatio
+ throws TException, IOException {
+ long handleStart = System.nanoTime();
+ if ((tEndpoint.getPort() - 10000) % 3 == 0
+ && random.nextDouble() < packetLossRatio
&& req.isRelay) {
throw new TException("Packet lost");
}
- if ((tEndpoint.getPort() - 10000) % 3 == 1 && random.nextDouble() <
packetLossRatio / 2
+ if ((tEndpoint.getPort() - 10000) % 3 == 1
+ && random.nextDouble() < packetLossRatio / 2
&& req.isRelay) {
throw new TException("Packet lost");
}
- if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
- Pair<Long, Long> nextStuckTime =
nextStuckTimeMap.computeIfAbsent(tEndpoint,
- e -> new Pair<>(System.currentTimeMillis(),
- System.currentTimeMillis() + stuckDurationMS));
+ if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay &&
stuckDurationMS > 0) {
+ Pair<Long, Long> nextStuckTime =
+ nextStuckTimeMap.computeIfAbsent(
+ tEndpoint,
+ e ->
+ new Pair<>(
+ System.currentTimeMillis(), System.currentTimeMillis() +
stuckDurationMS));
long currTime = System.currentTimeMillis();
if (currTime >= nextStuckTime.left && currTime < nextStuckTime.right) {
logger.debug("Node{} stalls", tEndpoint.getPort() - 10000);
@@ -121,38 +162,52 @@ public class TsFileSplitSenderTest extends TestBase {
throw new RuntimeException(e);
}
} else if (currTime > nextStuckTime.right) {
- nextStuckTimeMap.compute(tEndpoint, (endPoint, newInterval) -> {
- if (newInterval != null && currTime < newInterval.right) {
- return newInterval;
- }
- long start = currTime + minStuckIntervalMS + random.nextInt(
- (int) (maxStuckIntervalMS - minStuckIntervalMS));
- return new Pair<>(start, start + stuckDurationMS);
- });
+ nextStuckTimeMap.compute(
+ tEndpoint,
+ (endPoint, newInterval) -> {
+ if (newInterval != null && currTime < newInterval.right) {
+ return newInterval;
+ }
+ long start =
+ currTime
+ + minStuckIntervalMS
+ + random.nextInt((int) (maxStuckIntervalMS -
minStuckIntervalMS));
+ return new Pair<>(start, start + stuckDurationMS);
+ });
}
}
+ long decompressStart = System.nanoTime();
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
- LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(
- req.body.slice());
+ ByteBuffer buf = req.body.slice();
+ if (req.isSetCompressionType()) {
+ CompressionType compressionType =
CompressionType.deserialize(req.compressionType);
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(compressionType);
+ int uncompressedLength = req.getUncompressedLength();
+ ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
+ unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(),
buf.remaining(),
+ allocate.array(), 0);
+ allocate.limit(uncompressedLength);
+ buf = allocate;
+ }
+ decompressTime.addAndGet(System.nanoTime() - decompressStart);
+
+ long deserializeStart = System.nanoTime();
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(buf);
+ deserializeTime.addAndGet(System.nanoTime() - deserializeStart);
Set<Integer> splitIds =
phaseOneResults
- .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>(
- Comparator.comparingInt(ConsensusGroupId::getId)))
+ .computeIfAbsent(
+ tEndpoint,
+ e -> new
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
.computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
.computeIfAbsent(pieceNode.getTsFile(), f -> new
ConcurrentSkipListSet<>());
-
splitIds.addAll(pieceNode.getAllTsFileData().stream().map(TsFileData::getSplitId).collect(
- Collectors.toList()));
-
- synchronized (tEndpoint) {
- try {
- Thread.sleep(pieceNode.getDataSize() / nodeThroughput);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
+ splitIds.addAll(
+ pieceNode.getAllTsFileData().stream()
+ .map(TsFileData::getSplitId)
+ .collect(Collectors.toList()));
if (dummyDelayMS > 0) {
if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
@@ -173,17 +228,25 @@ public class TsFileSplitSenderTest extends TestBase {
// forward to other replicas in the group
if (req.isRelay) {
+ long relayStart = System.nanoTime();
req.isRelay = false;
TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
- for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+
regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation
-> {
TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
if (!otherPoint.equals(tEndpoint)) {
- handleTsFilePieceNode(req, otherPoint);
+ try {
+ handleTsFilePieceNode(req, otherPoint);
+ } catch (TException | IOException e) {
+ throw new RuntimeException(e);
+ }
}
- }
+ });
+ relayTime.addAndGet(System.nanoTime() - relayStart);
}
- return new TLoadResp().setAccepted(true)
+ sumHandleTime.addAndGet(System.nanoTime() - handleStart);
+ return new TLoadResp()
+ .setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@@ -191,7 +254,8 @@ public class TsFileSplitSenderTest extends TestBase {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
phaseTwoResults
- .computeIfAbsent(tEndpoint,
+ .computeIfAbsent(
+ tEndpoint,
e -> new
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
.computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> req.commandType);
@@ -208,7 +272,8 @@ public class TsFileSplitSenderTest extends TestBase {
}
}
- return new TLoadResp().setAccepted(true)
+ return new TLoadResp()
+ .setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@@ -217,8 +282,8 @@ public class TsFileSplitSenderTest extends TestBase {
for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File,
Set<Integer>>>>>
tEndPointMapEntry : phaseOneResults.entrySet()) {
TEndPoint endPoint = tEndPointMapEntry.getKey();
- for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
- consensusGroupIdMapEntry : tEndPointMapEntry.getValue().entrySet()) {
+ for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
consensusGroupIdMapEntry :
+ tEndPointMapEntry.getValue().entrySet()) {
ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
consensusGroupIdMapEntry.getValue().entrySet()) {
@@ -227,15 +292,15 @@ public class TsFileSplitSenderTest extends TestBase {
File tsFile = fileListEntry.getKey();
Set<Integer> chunks = fileListEntry.getValue();
System.out.printf(
- "%s - %s - %s - %s - %s chunks\n", endPoint, consensusGroupId,
uuid, tsFile,
- chunks.size());
-// if (consensusGroupId.getId() == 0) {
-// // d1, non-aligned series
-// assertEquals(expectedChunkNum() / 2, chunks.size());
-// } else {
-// // d2, aligned series
-// assertEquals(expectedChunkNum() / 2 / seriesNum,
chunks.size());
-// }
+ "%s - %s - %s - %s - %s chunks\n",
+ endPoint, consensusGroupId, uuid, tsFile, chunks.size());
+ // if (consensusGroupId.getId() == 0) {
+ // // d1, non-aligned series
+ // assertEquals(expectedChunkNum() / 2,
chunks.size());
+ // } else {
+ // // d2, aligned series
+ // assertEquals(expectedChunkNum() / 2 / seriesNum,
chunks.size());
+ // }
}
}
}
@@ -252,8 +317,9 @@ public class TsFileSplitSenderTest extends TestBase {
consensusGroupIdMapEntry.getValue().entrySet()) {
String uuid = stringMapEntry.getKey();
int command = stringMapEntry.getValue();
- System.out.printf("%s - %s - %s - %s\n", endPoint, consensusGroupId,
uuid,
- LoadCommand.values()[command]);
+ System.out.printf(
+ "%s - %s - %s - %s\n",
+ endPoint, consensusGroupId, uuid, LoadCommand.values()[command]);
assertEquals(LoadCommand.EXECUTE.ordinal(), command);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
index 62b5995b61b..0d307da377e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.queryengine.execution.load;
-import static org.junit.Assert.assertEquals;
-
import org.junit.Test;
import java.io.File;
@@ -28,6 +26,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+
public class TsFileSplitterTest extends TestBase {
private List<TsFileData> resultSet = new ArrayList<>();
@@ -44,7 +44,8 @@ public class TsFileSplitterTest extends TestBase {
// System.out.println(tsFileData);
}
System.out.printf(
- "%d/%d splits after %dms\n", resultSet.size(), expectedChunkNum(),
System.currentTimeMillis() - start);
+ "%d/%d splits after %dms\n",
+ resultSet.size(), expectedChunkNum(), System.currentTimeMillis() -
start);
assertEquals(resultSet.size(), expectedChunkNum());
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
index 494737f0446..9adc5db6202 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/SequenceUtils.java
@@ -36,7 +36,7 @@ public class SequenceUtils {
return id * 1.0;
}
- public static class Factory implements DoubleSequenceGeneratorFactory{
+ public static class Factory implements DoubleSequenceGeneratorFactory {
@Override
public DoubleSequenceGenerator create() {
return new SimpleDoubleSequenceGenerator();
@@ -46,32 +46,60 @@ public class SequenceUtils {
public static class UniformDoubleSequenceGenerator implements
DoubleSequenceGenerator {
+ private double bound;
private Random random = new Random();
+
+ public UniformDoubleSequenceGenerator(double bound) {
+ this.bound = bound;
+ }
+
@Override
public double gen(int id) {
- return random.nextDouble();
+ return random.nextDouble() * bound;
}
- public static class Factory implements DoubleSequenceGeneratorFactory{
+ public static class Factory implements DoubleSequenceGeneratorFactory {
+ private double bound;
+
+ public Factory(double bound) {
+ this.bound = bound;
+ }
+
@Override
public DoubleSequenceGenerator create() {
- return new UniformDoubleSequenceGenerator();
+ return new UniformDoubleSequenceGenerator(bound);
}
}
}
public static class GaussianDoubleSequenceGenerator implements
DoubleSequenceGenerator {
+ private double mean;
+ private double stderr;
private Random random = new Random();
+
+ public GaussianDoubleSequenceGenerator(double mean, double stderr) {
+ this.mean = mean;
+ this.stderr = stderr;
+ }
+
@Override
public double gen(int id) {
- return random.nextGaussian();
+ return mean + random.nextGaussian() * stderr;
}
- public static class Factory implements DoubleSequenceGeneratorFactory{
+ public static class Factory implements DoubleSequenceGeneratorFactory {
+ private double mean;
+ private double stderr;
+
+ public Factory(double mean, double stderr) {
+ this.mean = mean;
+ this.stderr = stderr;
+ }
+
@Override
public DoubleSequenceGenerator create() {
- return new GaussianDoubleSequenceGenerator();
+ return new GaussianDoubleSequenceGenerator(mean, stderr);
}
}
}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 0cd4313e43f..e2868734c67 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -310,6 +310,8 @@ struct TTsFilePieceReq{
3: required common.TConsensusGroupId consensusGroupId
// if isRelay is true, the receiver should forward the request to other
replicas in the group
4: optional bool isRelay
+ 5: optional i8 compressionType
+ 6: optional i32 uncompressedLength
}
struct TLoadCommandReq{