This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6a59cf43a41 Optimize local load TsFile piece dispatch (#17851) (#17876)
6a59cf43a41 is described below
commit 6a59cf43a416189c796fcdcdcc7e5d6a11ab4fa2
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 18:17:35 2026 +0800
Optimize local load TsFile piece dispatch (#17851) (#17876)
---
.../scheduler/load/LoadTsFileDispatcherImpl.java | 8 +-
.../load/splitter/AlignedChunkData.java | 8 +-
.../load/splitter/NonAlignedChunkData.java | 14 ++++
.../load/LoadTsFileDispatcherImplTest.java | 98 ++++++++++++++++++++++
.../load/splitter/ChunkDataDirectWriteTest.java | 89 ++++++++++++++++++++
5 files changed, 209 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index c0703113cdf..37f2fba832d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
@@ -156,12 +155,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
PlanNode planNode = instance.getFragment().getPlanNodeTree();
if (planNode instanceof LoadTsFilePieceNode) { // split
- LoadTsFilePieceNode pieceNode =
- (LoadTsFilePieceNode)
PlanNodeType.deserialize(planNode.serializeToByteBuffer());
- if (pieceNode == null) {
- throw new FragmentInstanceDispatchException(
- new
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
- }
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) planNode;
TSStatus resultStatus =
StorageEngine.getInstance().writeLoadTsFileNode((DataRegionId)
groupId, pieceNode, uuid);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index 86bd02c9947..8567d30660d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -287,7 +287,7 @@ public class AlignedChunkData implements ChunkData {
}
protected void deserializeTsFileData(TsFileIOWriter writer) throws
IOException, PageException {
- final InputStream stream = new ByteArrayInputStream(chunkData);
+ final InputStream stream = createTsFileDataInputStream();
if (needDecodeChunk) {
buildChunkWriter(stream, writer);
} else {
@@ -295,6 +295,12 @@ public class AlignedChunkData implements ChunkData {
}
}
+ private InputStream createTsFileDataInputStream() {
+ return chunkData == null
+ ? new ByteArrayInputStream(byteStream.getBuf(), 0, byteStream.size())
+ : new ByteArrayInputStream(chunkData);
+ }
+
protected void deserializeTsFileDataByte(final InputStream stream) throws
IOException {
final int size = ReadWriteIOUtils.readInt(stream);
this.chunkData = new byte[size];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
index 2d8a75052ff..6c5504e7a99 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
@@ -38,6 +38,7 @@ import org.apache.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.writer.TsFileIOWriter;
+import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -113,6 +114,7 @@ public class NonAlignedChunkData implements ChunkData {
@Override
public void writeToFileWriter(final TsFileIOWriter writer) throws
IOException {
+ ensureDataReadyForWriting();
if (chunk != null) {
writer.writeChunk(chunk);
} else {
@@ -120,6 +122,18 @@ public class NonAlignedChunkData implements ChunkData {
}
}
+ private void ensureDataReadyForWriting() throws IOException {
+ if (chunk != null || chunkWriter != null) {
+ return;
+ }
+
+ try {
+ deserializeTsFileData(new ByteArrayInputStream(byteStream.getBuf(), 0,
byteStream.size()));
+ } catch (final PageException e) {
+ throw new IOException(e);
+ }
+ }
+
@Override
public void serialize(final DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isModification(), stream);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
new file mode 100644
index 00000000000..3d496470597
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.scheduler.load;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.util.Collections;
+
+@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
"javax.management.*"})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StorageEngine.class)
+public class LoadTsFileDispatcherImplTest {
+
+ @Test
+ public void testDispatchLocallyPieceNodeSkipsSerdeRoundTrip() throws
Exception {
+ final StorageEngine storageEngine = Mockito.mock(StorageEngine.class);
+ PowerMockito.mockStatic(StorageEngine.class);
+ PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine);
+
+ final LoadTsFileDispatcherImpl dispatcher = new
LoadTsFileDispatcherImpl(null, false);
+ dispatcher.setUuid("test-uuid");
+
+ final LoadTsFilePieceNode pieceNode =
+ new LoadTsFilePieceNode(new PlanNodeId("piece"), new
File("test.tsfile"));
+ final FragmentInstance instance = createFragmentInstance(pieceNode);
+
+ Mockito.when(
+ storageEngine.writeLoadTsFileNode(
+ Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode),
Mockito.eq("test-uuid")))
+ .thenReturn(RpcUtils.SUCCESS_STATUS);
+
+ dispatcher.dispatchLocally(instance);
+
+ Mockito.verify(storageEngine)
+ .writeLoadTsFileNode(
+ Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode),
Mockito.eq("test-uuid"));
+ }
+
+ private static FragmentInstance createFragmentInstance(final
LoadTsFilePieceNode pieceNode) {
+ final PlanFragmentId fragmentId = new PlanFragmentId("test", 0);
+ final FragmentInstance instance =
+ new FragmentInstance(
+ new PlanFragment(fragmentId, pieceNode),
+ fragmentId.genFragmentInstanceId(),
+ null,
+ null,
+ 0,
+ null,
+ false,
+ false);
+ final TConsensusGroupId consensusGroupId = new
DataRegionId(1).convertToTConsensusGroupId();
+ instance.setExecutorAndHost(
+ new StorageExecutor(
+ new TRegionReplicaSet(
+ consensusGroupId,
+ Collections.singletonList(
+ new TDataNodeLocation().setInternalEndPoint(new
TEndPoint("127.0.0.1", 1))))));
+ return instance;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
new file mode 100644
index 00000000000..e98541a9125
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.splitter;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+
+public class ChunkDataDirectWriteTest {
+
+ @Test
+ public void testNonAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws
Exception {
+ final NonAlignedChunkData chunkData = createNonAlignedChunkData();
+ chunkData.setNotDecode();
+ final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
+ chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
+
+ final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
+ chunkData.writeToFileWriter(writer);
+
+ Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
+ }
+
+ @Test
+ public void testAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws
Exception {
+ final AlignedChunkData chunkData = createAlignedChunkData();
+ chunkData.setNotDecode();
+ final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
+
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
+ chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
+
+ final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
+ chunkData.writeToFileWriter(writer);
+
+ Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
+ }
+
+ private static Statistics<?> createInt32Statistics() {
+ final Statistics<?> statistics =
Statistics.getStatsByType(TSDataType.INT32);
+ statistics.update(1L, 1);
+ return statistics;
+ }
+
+ private static NonAlignedChunkData createNonAlignedChunkData() {
+ return (NonAlignedChunkData)
+ ChunkData.createChunkData(
+ false, "root.sg.d1", createChunkHeader(), new
TTimePartitionSlot(0L));
+ }
+
+ private static AlignedChunkData createAlignedChunkData() {
+ return (AlignedChunkData)
+ ChunkData.createChunkData(
+ true, "root.sg.d1", createChunkHeader(), new
TTimePartitionSlot(0L));
+ }
+
+ private static ChunkHeader createChunkHeader() {
+ return new ChunkHeader(
+ "temperature", 0, TSDataType.INT32, CompressionType.UNCOMPRESSED,
TSEncoding.PLAIN, 0);
+ }
+}