This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-legacy-compat-followup in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7598fc41b366ff1256c6dcbc54309e108f9194ad Author: Caideyipi <[email protected]> AuthorDate: Fri Jun 12 10:49:04 2026 +0800 Fix 1.3.7 binaryBuffers --- .../request/PipeTransferTabletBatchReq.java | 49 +++++++++++++++++++-- .../pipe/sink/PipeDataNodeThriftRequestTest.java | 51 +++++++++++++++++++++- 2 files changed, 95 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index ede3370f5b0..797ea509602 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -45,6 +45,7 @@ import java.util.Objects; public class PipeTransferTabletBatchReq extends TPipeTransferReq { + private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>(); private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>(); private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>(); @@ -60,6 +61,26 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { final List<InsertRowStatement> insertRowStatementList = new ArrayList<>(); final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); + for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) { + final InsertBaseStatement statement = binaryReq.constructStatement(); + if (statement.isEmpty()) { + continue; + } + if (statement instanceof InsertRowStatement) { + insertRowStatementList.add((InsertRowStatement) statement); + } else if (statement instanceof InsertTabletStatement) { + insertTabletStatementList.add((InsertTabletStatement) statement); + } else if (statement instanceof InsertRowsStatement) { + insertRowStatementList.addAll( + ((InsertRowsStatement) statement).getInsertRowStatementList()); + } else { + throw new UnsupportedOperationException( + String.format( + "Unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.", + statement)); + } + } + for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) { final InsertBaseStatement statement = insertNodeReq.constructStatement(); if (statement.isEmpty()) { @@ -132,9 +153,23 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq(); final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); - // Binary size, for rolling upgrade - ReadWriteIOUtils.readInt(transferReq.body); + // Legacy 1.3.x batch bodies may carry WAL binary requests before insert nodes and tablets. int size = ReadWriteIOUtils.readInt(transferReq.body); + for (int i = 0; i < size; ++i) { + final int length = ReadWriteIOUtils.readInt(transferReq.body); + if (length < 0 || length > transferReq.body.remaining()) { + throw new IllegalArgumentException( + String.format( + "Invalid binary request body length %s, remaining body length %s.", + length, transferReq.body.remaining())); + } + final byte[] body = new byte[length]; + transferReq.body.get(body); + batchReq.binaryReqs.add( + PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body))); + } + + size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { batchReq.insertNodeReqs.add( PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq( @@ -155,6 +190,11 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { /////////////////////////////// TestOnly /////////////////////////////// + @TestOnly + public List<PipeTransferTabletBinaryReq> getBinaryReqs() { + return binaryReqs; + } + @TestOnly public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() { return insertNodeReqs; @@ -176,7 +216,8 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { return false; } final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj; - return insertNodeReqs.equals(that.insertNodeReqs) + return binaryReqs.equals(that.binaryReqs) + && insertNodeReqs.equals(that.insertNodeReqs) && tabletReqs.equals(that.tabletReqs) && version == that.version && type == that.type @@ -185,6 +226,6 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { @Override public int hashCode() { - return Objects.hash(insertNodeReqs, tabletReqs, version, type, body); + return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 88f7353f519..5e7c6f279bf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -702,6 +702,43 @@ public class PipeDataNodeThriftRequestTest { assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); } + @Test + public void testPipeTransferTabletBatchReqFromLegacyV13BodyWithBinaryReqs() throws IOException { + final InsertRowNode node = + new InsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "sg", "d"}), + false, + new String[] {"s"}, + new TSDataType[] {TSDataType.INT32}, + 1, + new Object[] {1}, + false); + final ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] {'a', 'b'}); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TABLET_BATCH, + serializeLegacyTabletBatchBody( + Collections.singletonList(binaryBuffer), + Collections.singletonList(node.serializeToByteBuffer()), + Collections.singletonList(serializeLegacyTabletRawBuffer(false)))); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializedReq.getVersion()); + Assert.assertEquals(req.getType(), deserializedReq.getType()); + Assert.assertEquals(1, deserializedReq.getBinaryReqs().size()); + Assert.assertArrayEquals( + new byte[] {'a', 'b'}, + byteBufferToByteArray(deserializedReq.getBinaryReqs().get(0).getByteBuffer())); + Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size()); + Assert.assertEquals(1, deserializedReq.getTabletReqs().size()); + Assert.assertEquals(node, deserializedReq.getInsertNodeReqs().get(0).getInsertNode()); + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); + } + @Test public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOException { final TPipeTransferReq req = new TPipeTransferReq(); @@ -1271,9 +1308,21 @@ public class PipeDataNodeThriftRequestTest { private static ByteBuffer serializeLegacyTabletBatchBody( final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> tabletBuffers) throws IOException { + return serializeLegacyTabletBatchBody(Collections.emptyList(), insertNodeBuffers, tabletBuffers); + } + + private static ByteBuffer serializeLegacyTabletBatchBody( + final List<ByteBuffer> binaryBuffers, + final List<ByteBuffer> insertNodeBuffers, + final List<ByteBuffer> tabletBuffers) + throws IOException { try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - ReadWriteIOUtils.write(0, outputStream); + ReadWriteIOUtils.write(binaryBuffers.size(), outputStream); + for (final ByteBuffer binaryBuffer : binaryBuffers) { + ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream); + writeByteBuffer(outputStream, binaryBuffer); + } ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream); for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
