This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load_v2 by this push:
new e7562e8a60b add comments
e7562e8a60b is described below
commit e7562e8a60b98620f6acd13c76fe715d21b42fce
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Sep 27 10:50:53 2023 +0800
add comments
---
.../load/DeviceBatchTsFileDataManager.java | 1 +
.../execution/load/LoadTsFileManager.java | 4 +++
.../nodesplit/ClusteringMeasurementSplitter.java | 40 ++++++++--------------
.../execution/load/LoadTsFileManagerTest.java | 5 ++-
4 files changed, 24 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
index b343c0b4a06..ed3f92ff925 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
@@ -45,6 +45,7 @@ public class DeviceBatchTsFileDataManager extends
TsFileDataManager {
return false;
}
}
+ // add the chunk into the batch
currentDeviceId = chunkData.getDevice();
nonDirectionalChunkData.add(chunkData);
dataSize += chunkData.getDataSize();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 3768da85af1..6b055c55db5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -218,8 +218,10 @@ public class LoadTsFileManager {
}
}
+ // method is synchronized because the chunks in a chunk group may be sent
in parallel
@SuppressWarnings("squid:S3824")
private synchronized void write(DataPartitionInfo partitionInfo, ChunkData
chunkData) throws IOException {
+ // ensure that retransmission will not result in writing duplicated data
if (receivedSplitIds.contains(chunkData.getSplitId())) {
return;
}
@@ -249,7 +251,9 @@ public class LoadTsFileManager {
receivedSplitIds.add(chunkData.getSplitId());
}
+ // method is synchronized because the chunks in a chunk group may be sent
in parallel
private synchronized void writeDeletion(TsFileData deletionData) throws
IOException {
+ // ensure that retransmission will not result in writing duplicated data
if (receivedSplitIds.contains(deletionData.getSplitId())) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
index 7c53a6295b1..3994941c8b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
@@ -19,16 +19,24 @@
package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
-import com.fasterxml.jackson.databind.ObjectReader;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
-import java.util.Iterator;
-import java.util.OptionalDouble;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
-import java.util.stream.LongStream;
-import net.ricecode.similarity.DiceCoefficientStrategy;
-import net.ricecode.similarity.JaroStrategy;
+import java.util.stream.Collectors;
import net.ricecode.similarity.LevenshteinDistanceStrategy;
+import net.ricecode.similarity.SimilarityStrategy;
+import net.ricecode.similarity.StringSimilarityService;
+import net.ricecode.similarity.StringSimilarityServiceImpl;
import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
@@ -39,24 +47,6 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Chunk;
-
-import net.ricecode.similarity.JaroWinklerStrategy;
-import net.ricecode.similarity.SimilarityStrategy;
-import net.ricecode.similarity.StringSimilarityService;
-import net.ricecode.similarity.StringSimilarityServiceImpl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
index 97e98c0ce4e..046c52794fb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@ -94,6 +94,7 @@ public class LoadTsFileManagerTest extends TestBase {
ConsensusGroupId d1GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
DataRegion dataRegion =
dataRegionMap.get(d1GroupId.convertToTConsensusGroupId());
List<TsFileResource> tsFileList =
dataRegion.getTsFileManager().getTsFileList(false);
+ // all input files should be shallow-merged into one
System.out.printf("Loaded TsFiles: %s\n", tsFileList);
assertEquals(1, tsFileList.size());
@@ -101,16 +102,18 @@ public class LoadTsFileManagerTest extends TestBase {
long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio);
int chunkPointNum = (int) (chunkTimeRange / pointInterval);
long endTime = chunkTimeRange * (fileNum - 1) + pointInterval *
(chunkPointNum - 1);
-
+ // check device time
TsFileResource tsFileResource = tsFileList.get(0);
for (int i = 0; i < deviceNum; i++) {
assertEquals(0, tsFileResource.getStartTime("d" + i));
assertEquals(endTime, tsFileResource.getEndTime("d" + i));
}
+ // read and check the generated file
try (TsFileReader reader = new TsFileReader(
new TsFileSequenceReader(tsFileResource.getTsFile().getPath()))) {
for (int dn = 0; dn < deviceNum; dn++) {
+ // "Simple_" is generated with linear function and is easy to check
QueryExpression queryExpression = QueryExpression.create(
Collections.singletonList(new Path("d" + dn, "Simple_22", false)),
null);
QueryDataSet dataSet = reader.query(queryExpression);