This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load_v2 by this push:
new 445fe8796cc add LoadTsFileManagerTest
445fe8796cc is described below
commit 445fe8796cca6c9f2ba801890ac9f9f6b1a9d2b1
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Sep 27 10:41:33 2023 +0800
add LoadTsFileManagerTest
---
.../execution/load/LoadTsFileManager.java | 16 +-
.../execution/load/LoadTsFileManagerTest.java | 196 +++++++++++++++++++++
.../db/queryengine/execution/load/TestBase.java | 52 ++++--
.../execution/load/TsFileSplitSenderTest.java | 1 +
4 files changed, 248 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 97542ceab24..3768da85af1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.queryengine.execution.load;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -195,12 +197,14 @@ public class LoadTsFileManager {
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
private boolean isClosed;
+ private Set<Integer> receivedSplitIds;
private TsFileWriterManager(File taskDir) {
this.taskDir = taskDir;
this.dataPartition2Writer = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
this.isClosed = false;
+ this.receivedSplitIds = new HashSet<>();
clearDir(taskDir);
}
@@ -215,7 +219,10 @@ public class LoadTsFileManager {
}
@SuppressWarnings("squid:S3824")
- private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
throws IOException {
+ private synchronized void write(DataPartitionInfo partitionInfo, ChunkData
chunkData) throws IOException {
+ if (receivedSplitIds.contains(chunkData.getSplitId())) {
+ return;
+ }
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
@@ -239,15 +246,20 @@ public class LoadTsFileManager {
dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
}
chunkData.writeToFileWriter(writer);
+ receivedSplitIds.add(chunkData.getSplitId());
}
- private void writeDeletion(TsFileData deletionData) throws IOException {
+ private synchronized void writeDeletion(TsFileData deletionData) throws
IOException {
+ if (receivedSplitIds.contains(deletionData.getSplitId())) {
+ return;
+ }
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
deletionData.writeToFileWriter(entry.getValue());
}
+ receivedSplitIds.add(deletionData.getSplitId());
}
private void loadAll(boolean isGeneratedByPipe) throws IOException,
LoadFileException {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
new file mode 100644
index 00000000000..97e98c0ce4e
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.read.TsFileReader;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+public class LoadTsFileManagerTest extends TestBase {
+
+ private LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
+ private long maxSplitSize = 128 * 1024 * 1024;
+
+ @Override
+ public void setup() throws IOException, WriteProcessException,
DataRegionException {
+ fileNum = 10;
+ seriesNum = 100;
+ deviceNum = 100;
+ super.setup();
+ }
+
+ @Test
+ public void test() throws IOException {
+
+ LoadTsFileNode loadTsFileNode =
+ new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
+ DataPartitionBatchFetcher partitionBatchFetcher =
dummyDataPartitionBatchFetcher();
+ TsFileSplitSender splitSender =
+ new TsFileSplitSender(
+ loadTsFileNode,
+ partitionBatchFetcher,
+ TimePartitionUtils.getTimePartitionInterval(),
+ internalServiceClientManager,
+ false,
+ maxSplitSize,
+ 100);
+ long start = System.currentTimeMillis();
+ splitSender.start();
+ long timeConsumption = System.currentTimeMillis() - start;
+
+ System.out.printf("Split ends after %dms\n", timeConsumption);
+
+ ConsensusGroupId d1GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
+ DataRegion dataRegion =
dataRegionMap.get(d1GroupId.convertToTConsensusGroupId());
+ List<TsFileResource> tsFileList =
dataRegion.getTsFileManager().getTsFileList(false);
+ System.out.printf("Loaded TsFiles: %s\n", tsFileList);
+ assertEquals(1, tsFileList.size());
+
+ long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval();
+ long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio);
+ int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+ long endTime = chunkTimeRange * (fileNum - 1) + pointInterval *
(chunkPointNum - 1);
+
+ TsFileResource tsFileResource = tsFileList.get(0);
+ for (int i = 0; i < deviceNum; i++) {
+ assertEquals(0, tsFileResource.getStartTime("d" + i));
+ assertEquals(endTime, tsFileResource.getEndTime("d" + i));
+ }
+
+ try (TsFileReader reader = new TsFileReader(
+ new TsFileSequenceReader(tsFileResource.getTsFile().getPath()))) {
+ for (int dn = 0; dn < deviceNum; dn++) {
+ QueryExpression queryExpression = QueryExpression.create(
+ Collections.singletonList(new Path("d" + dn, "Simple_22", false)),
null);
+ QueryDataSet dataSet = reader.query(queryExpression);
+ int i = 0;
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ long currTime =
+ chunkTimeRange * (i / chunkPointNum) + pointInterval * (i %
chunkPointNum);
+ assertEquals(currTime, record.getTimestamp());
+ assertEquals(1.0 * (i % chunkPointNum),
record.getFields().get(0).getDoubleV(), 0.0001);
+ i++;
+ }
+ assertEquals(chunkPointNum * fileNum, i);
+ }
+ }
+ }
+
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint)
+ throws TException, IOException {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ ByteBuffer buf = req.body.slice();
+ if (req.isSetCompressionType()) {
+ CompressionType compressionType =
CompressionType.deserialize(req.compressionType);
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(compressionType);
+ int uncompressedLength = req.getUncompressedLength();
+ ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
+ unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(),
buf.remaining(),
+ allocate.array(), 0);
+ allocate.limit(uncompressedLength);
+ buf = allocate;
+ }
+
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(buf);
+
loadTsFileManager.writeToDataRegion(dataRegionMap.get(req.consensusGroupId),
pieceNode,
+ req.uuid);
+
+ // forward to other replicas in the group
+ if (req.isRelay) {
+ req.isRelay = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+
regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation
-> {
+ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+ if (!otherPoint.equals(tEndpoint)) {
+ try {
+ handleTsFilePieceNode(req, otherPoint);
+ } catch (TException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint)
+ throws LoadFileException, IOException {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+
+ // forward to other replicas in the group
+ if (req.useConsensus) {
+ req.useConsensus = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+ for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
+ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+ if (!otherPoint.equals(tEndpoint)) {
+ handleTsLoadCommand(req, otherPoint);
+ }
+ }
+ }
+
+ loadTsFileManager.loadAll(req.uuid, false);
+
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index fab94c1751b..a8e7ee3b2bf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.queryengine.execution.load;
import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
import java.util.Comparator;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -41,8 +43,15 @@ import
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
+import
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGenerator;
import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGeneratorFactory;
import org.apache.iotdb.db.utils.SequenceUtils.GaussianDoubleSequenceGenerator;
@@ -91,11 +100,11 @@ import java.util.stream.IntStream;
public class TestBase {
private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
- public static final String BASE_OUTPUT_PATH =
"target".concat(File.separator);
+ public static final String BASE_OUTPUT_PATH =
"target".concat(File.separator).concat("loadTest");
public static final String PARTIAL_PATH_STRING =
"%s" + File.separator + "%d" + File.separator + "%d" + File.separator;
public static final String TEST_TSFILE_PATH =
- BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) +
PARTIAL_PATH_STRING;
+ BASE_OUTPUT_PATH + File.separator + "testTsFile".concat(File.separator)
+ PARTIAL_PATH_STRING;
protected int fileNum = 100;
// series number of each file, sn non-aligned series and 1 aligned series
with sn measurements
@@ -117,6 +126,7 @@ public class TestBase {
protected IPartitionFetcher partitionFetcher;
// the key is deviceId, not partitioned by time in the simple test
protected Map<String, TRegionReplicaSet> partitionTable = new HashMap<>();
+ protected Map<TConsensusGroupId, DataRegion> dataRegionMap = new HashMap<>();
protected Map<ConsensusGroupId, TRegionReplicaSet> groupId2ReplicaSetMap =
new HashMap<>();
protected IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
@@ -125,7 +135,7 @@ public class TestBase {
private int groupSizeInByte;
@Before
- public void setup() throws IOException, WriteProcessException {
+ public void setup() throws IOException, WriteProcessException,
DataRegionException {
setupFiles();
logger.info("{} files set up", files.size());
partitionFetcher = dummyPartitionFetcher();
@@ -137,9 +147,8 @@ public class TestBase {
@After
public void cleanup() {
- for (File file : files) {
- file.delete();
- }
+ FileUtils.deleteDirectory(new File(BASE_OUTPUT_PATH));
+ FileUtils.deleteDirectory(new File("target" + File.separator + "data"));
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(groupSizeInByte);
}
@@ -165,7 +174,8 @@ public class TestBase {
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint) {
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint)
+ throws LoadFileException, IOException {
return new TLoadResp()
.setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
@@ -195,8 +205,12 @@ public class TestBase {
}
@Override
- public TLoadResp sendLoadCommand(TLoadCommandReq req) {
- return handleTsLoadCommand(req, getTEndpoint());
+ public TLoadResp sendLoadCommand(TLoadCommandReq req) throws
TException {
+ try {
+ return handleTsLoadCommand(req, getTEndpoint());
+ } catch (LoadFileException | IOException e) {
+ throw new TException(e);
+ }
}
@Override
@@ -218,7 +232,7 @@ public class TestBase {
};
}
- public void setupPartitionTable() {
+ public void setupPartitionTable() throws DataRegionException {
ConsensusGroupId d1GroupId =
Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
TRegionReplicaSet d1Replicas =
new TRegionReplicaSet(
@@ -233,8 +247,14 @@ public class TestBase {
new TDataNodeLocation()
.setDataNodeId(2)
.setInternalEndPoint(new TEndPoint("localhost", 10002))));
+
+ WALRecoverManager.getInstance()
+ .setAllDataRegionScannedLatch(new CountDownLatch(0));
+ DataRegion dataRegion = new DataRegion(BASE_OUTPUT_PATH,
d1GroupId.toString(),
+ new DirectFlushPolicy(), "root.loadTest");
for (int i = 0; i < deviceNum; i++) {
partitionTable.put("d" + i, d1Replicas);
+ dataRegionMap.put(d1GroupId.convertToTConsensusGroupId(), dataRegion);
}
groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
@@ -334,8 +354,9 @@ public class TestBase {
schemaGeneratorPairs.add(new Pair<>(measurementSchema,
measurementGeneratorPair.right));
}
schemaGeneratorPairs.sort(Comparator.comparing(s ->
s.left.getMeasurementId()));
- List<MeasurementSchema> measurementSchemas =
schemaGeneratorPairs.stream().map(m -> m.left).collect(
- Collectors.toList());
+ List<MeasurementSchema> measurementSchemas =
schemaGeneratorPairs.stream().map(m -> m.left)
+ .collect(
+ Collectors.toList());
IntStream.range(0, fileNum)
.parallel()
.forEach(
@@ -352,7 +373,8 @@ public class TestBase {
// dd2
for (int sn = 0; sn < seriesNum; sn++) {
for (int dn = 0; dn < deviceNum; dn++) {
- writer.registerTimeseries(new Path("d"+dn),
schemaGeneratorPairs.get(sn).left);
+ writer.registerTimeseries(new Path("d" + dn),
+ schemaGeneratorPairs.get(sn).left);
}
}
writer.registerAlignedTimeseries(new Path("dd1"),
measurementSchemas);
@@ -387,8 +409,8 @@ public class TestBase {
writer.flushAllChunkGroups();
for (int dn = 0; dn < deviceNum; dn++) {
- tsFileResource.updateStartTime("d"+dn, chunkTimeRange * i);
- tsFileResource.updateEndTime("d"+dn, chunkTimeRange * (i +
1));
+ tsFileResource.updateStartTime("d" + dn, chunkTimeRange *
i);
+ tsFileResource.updateEndTime("d" + dn, chunkTimeRange * (i
+ 1));
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 818e3f80d25..a1509216852 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -120,6 +120,7 @@ public class TsFileSplitSenderTest extends TestBase {
long start = System.currentTimeMillis();
splitSender.start();
long timeConsumption = System.currentTimeMillis() - start;
+ thread.interrupt();
printPhaseResult();
long transmissionTime = splitSender.getStatistic().compressedSize.get() /
nodeThroughput;