This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-fix-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-fix-fix by this push:
new c7876f3f0a4 part
c7876f3f0a4 is described below
commit c7876f3f0a4620918ffa98fe2cd9b75b7813b7b8
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 23 18:53:46 2026 +0800
part
---
.../request/PipeTransferTabletBatchReq.java | 50 +++-------------------
1 file changed, 6 insertions(+), 44 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 48bd1016763..4c426dfbaca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -45,7 +45,6 @@ import java.util.Objects;
public class PipeTransferTabletBatchReq extends TPipeTransferReq {
- private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new
ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs
= new ArrayList<>();
private final transient List<PipeTransferTabletRawReq> tabletReqs = new
ArrayList<>();
@@ -61,26 +60,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new
ArrayList<>();
- for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
- final InsertBaseStatement statement = binaryReq.constructStatement();
- if (statement.isEmpty()) {
- continue;
- }
- if (statement instanceof InsertRowStatement) {
- insertRowStatementList.add((InsertRowStatement) statement);
- } else if (statement instanceof InsertTabletStatement) {
- insertTabletStatementList.add((InsertTabletStatement) statement);
- } else if (statement instanceof InsertRowsStatement) {
- insertRowStatementList.addAll(
- ((InsertRowsStatement) statement).getInsertRowStatementList());
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "unknown InsertBaseStatement %s constructed from
PipeTransferTabletBinaryReq.",
- binaryReq));
- }
- }
-
for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs)
{
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
@@ -117,7 +96,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletBatchReq toTPipeTransferReq(
- final List<ByteBuffer> binaryBuffers,
final List<ByteBuffer> insertNodeBuffers,
final List<ByteBuffer> tabletBuffers)
throws IOException {
@@ -130,11 +108,8 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
- for (final ByteBuffer binaryBuffer : binaryBuffers) {
- ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
- outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
- }
+ // Binary buffer, for rolling upgrade
+ ReadWriteIOUtils.write(0, outputStream);
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
@@ -157,16 +132,9 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
final TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
+ // Binary size, for rolling upgrade
+ ReadWriteIOUtils.readInt(transferReq.body);
int size = ReadWriteIOUtils.readInt(transferReq.body);
- for (int i = 0; i < size; ++i) {
- final int length = ReadWriteIOUtils.readInt(transferReq.body);
- final byte[] body = new byte[length];
- transferReq.body.get(body);
- batchReq.binaryReqs.add(
-
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
- }
-
- size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
@@ -188,11 +156,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
/////////////////////////////// TestOnly ///////////////////////////////
- @TestOnly
- public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
- return binaryReqs;
- }
-
@TestOnly
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
return insertNodeReqs;
@@ -214,8 +177,7 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
return false;
}
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
- return binaryReqs.equals(that.binaryReqs)
- && insertNodeReqs.equals(that.insertNodeReqs)
+ return insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
@@ -224,6 +186,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
@Override
public int hashCode() {
- return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type,
body);
+ return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
}
}