This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4747d5f243f Optimize local load TsFile piece dispatch (#17851)
4747d5f243f is described below
commit 4747d5f243f04d0ef293b29beb8209cb02c463e4
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 11:35:05 2026 +0800
Optimize local load TsFile piece dispatch (#17851)
---
.../scheduler/load/LoadTsFileDispatcherImpl.java | 8 +-
.../load/splitter/AlignedChunkData.java | 9 ++
.../load/splitter/NonAlignedChunkData.java | 16 ++++
.../load/LoadTsFileDispatcherImplTest.java | 98 ++++++++++++++++++++++
.../load/splitter/ChunkDataDirectWriteTest.java | 91 ++++++++++++++++++++
5 files changed, 215 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index fcd884d696d..bd5356ccff4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
-import
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -161,12 +160,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
PlanNode planNode = instance.getFragment().getPlanNodeTree();
if (planNode instanceof LoadTsFilePieceNode) { // split
- LoadTsFilePieceNode pieceNode =
- (LoadTsFilePieceNode)
PlanNodeType.deserialize(planNode.serializeToByteBuffer());
- if (pieceNode == null) {
- throw new FragmentInstanceDispatchException(
- new
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
- }
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) planNode;
TSStatus resultStatus =
StorageEngine.getInstance().writeLoadTsFileNode((DataRegionId)
groupId, pieceNode, uuid);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index 2c9cea06852..d852ec3c9b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -290,6 +290,7 @@ public class AlignedChunkData implements ChunkData {
}
protected void writeTsFileData(TsFileIOWriter writer) throws IOException,
PageException {
+ ensureDataReadyForWriting();
final InputStream stream = new
LoadTsFilePieceNode.ByteBufferInputStream(chunkData);
if (needDecodeChunk) {
writeChunkToWriter(stream, writer);
@@ -298,6 +299,14 @@ public class AlignedChunkData implements ChunkData {
}
}
+ private void ensureDataReadyForWriting() throws IOException {
+ if (chunkData != null) {
+ chunkData.rewind();
+ return;
+ }
+ chunkData = ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size());
+ }
+
protected void deserializeTsFileDataByte(final InputStream stream) throws
IOException {
final int size = ReadWriteIOUtils.readInt(stream);
if (stream instanceof LoadTsFilePieceNode.ByteBufferInputStream) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
index 2310b9cb95c..7b3c2fd3f95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.header.ChunkHeader;
@@ -114,6 +115,7 @@ public class NonAlignedChunkData implements ChunkData {
@Override
public void writeToFileWriter(final TsFileIOWriter writer) throws
IOException {
+ ensureDataReadyForWriting();
if (chunk != null) {
writer.writeChunk(chunk);
} else {
@@ -121,6 +123,20 @@ public class NonAlignedChunkData implements ChunkData {
}
}
+ private void ensureDataReadyForWriting() throws IOException {
+ if (chunk != null || chunkWriter != null) {
+ return;
+ }
+
+ try {
+ deserializeTsFileData(
+ new LoadTsFilePieceNode.ByteBufferInputStream(
+ ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size())));
+ } catch (final PageException e) {
+ throw new IOException(e);
+ }
+ }
+
@Override
public void serialize(final DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(getType().ordinal(), stream);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
new file mode 100644
index 00000000000..2da982fd23d
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.scheduler.load;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.util.Collections;
+
+@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
"javax.management.*"})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StorageEngine.class)
+public class LoadTsFileDispatcherImplTest {
+
+ @Test
+ public void testDispatchLocallyPieceNodeSkipsSerdeRoundTrip() throws
Exception {
+ final StorageEngine storageEngine = Mockito.mock(StorageEngine.class);
+ PowerMockito.mockStatic(StorageEngine.class);
+ PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine);
+
+ final LoadTsFileDispatcherImpl dispatcher = new
LoadTsFileDispatcherImpl(null, false);
+ dispatcher.setUuid("test-uuid");
+
+ final LoadTsFilePieceNode pieceNode =
+ new LoadTsFilePieceNode(new PlanNodeId("piece"), new
File("test.tsfile"));
+ final FragmentInstance instance = createFragmentInstance(pieceNode);
+
+ Mockito.when(
+ storageEngine.writeLoadTsFileNode(
+ Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode),
Mockito.eq("test-uuid")))
+ .thenReturn(RpcUtils.SUCCESS_STATUS);
+
+ dispatcher.dispatchLocally(instance);
+
+ Mockito.verify(storageEngine)
+ .writeLoadTsFileNode(
+ Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode),
Mockito.eq("test-uuid"));
+ }
+
+ private static FragmentInstance createFragmentInstance(final
LoadTsFilePieceNode pieceNode) {
+ final PlanFragmentId fragmentId = new PlanFragmentId("test", 0);
+ final FragmentInstance instance =
+ new FragmentInstance(
+ new PlanFragment(fragmentId, pieceNode),
+ fragmentId.genFragmentInstanceId(),
+ null,
+ null,
+ 0,
+ null,
+ false,
+ false);
+ final TConsensusGroupId consensusGroupId = new
DataRegionId(1).convertToTConsensusGroupId();
+ instance.setExecutorAndHost(
+ new StorageExecutor(
+ new TRegionReplicaSet(
+ consensusGroupId,
+ Collections.singletonList(
+ new TDataNodeLocation().setInternalEndPoint(new
TEndPoint("127.0.0.1", 1))))));
+ return instance;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
new file mode 100644
index 00000000000..c824a6c0ce9
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.splitter;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+
+public class ChunkDataDirectWriteTest {
+
+ @Test
+ public void testNonAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws
Exception {
+ final NonAlignedChunkData chunkData = createNonAlignedChunkData();
+ chunkData.setNotDecode();
+ final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
+ chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
+
+ final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
+ chunkData.writeToFileWriter(writer);
+
+ Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
+ }
+
+ @Test
+ public void testAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws
Exception {
+ final AlignedChunkData chunkData = createAlignedChunkData();
+ chunkData.setNotDecode();
+ final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
+ chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
+
+ final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
+ chunkData.writeToFileWriter(writer);
+
+ Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
+ }
+
+ private static Statistics<?> createInt32Statistics() {
+ final Statistics<?> statistics =
Statistics.getStatsByType(TSDataType.INT32);
+ statistics.update(1L, 1);
+ return statistics;
+ }
+
+ private static NonAlignedChunkData createNonAlignedChunkData() {
+ final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1");
+ return (NonAlignedChunkData)
+ ChunkData.createChunkData(false, device, createChunkHeader(), new
TTimePartitionSlot(0L));
+ }
+
+ private static AlignedChunkData createAlignedChunkData() {
+ final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1");
+ return (AlignedChunkData)
+ ChunkData.createChunkData(true, device, createChunkHeader(), new
TTimePartitionSlot(0L));
+ }
+
+ private static ChunkHeader createChunkHeader() {
+ return new ChunkHeader(
+ "temperature", 0, TSDataType.INT32, CompressionType.UNCOMPRESSED,
TSEncoding.PLAIN, 0);
+ }
+}