This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch opc-fix-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/opc-fix-fix by this push:
     new bb87e11a538 fix
bb87e11a538 is described below

commit bb87e11a538d9256a81aac75c558b6f2e6cd2e86
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 23 19:12:24 2026 +0800

    fix
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 16 +----
 .../evolvable/batch/PipeTabletEventPlainBatch.java | 11 +---
 .../request/PipeTransferTabletBatchReq.java        |  5 +-
 .../request/PipeTransferTabletBatchReqV2.java      | 75 ++--------------------
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java | 19 ++----
 .../iotconsensusv2/IoTConsensusV2AsyncSink.java    | 13 +---
 .../iotconsensusv2/IoTConsensusV2SyncSink.java     | 20 ++----
 .../IoTConsensusV2TransferBatchReqBuilder.java     | 16 ++---
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  6 +-
 .../thrift/sync/IoTDBDataRegionSyncSink.java       | 17 ++---
 .../sink/protocol/writeback/WriteBackSink.java     | 14 +---
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 11 +---
 12 files changed, 39 insertions(+), 184 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index b576ac507b8..1cb50747479 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -53,7 +53,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
-import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.collector.TabletCollector;
@@ -69,7 +68,8 @@ import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -159,18 +159,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     this.allocatedMemoryBlock = new AtomicReference<>();
   }
 
+  @Nonnull
   public InsertNode getInsertNode() {
     return insertNode;
   }
 
-  public ByteBuffer getByteBuffer() throws WALPipeException {
-    final InsertNode node = insertNode;
-    if (Objects.isNull(node)) {
-      throw new PipeException("InsertNode has been released");
-    }
-    return node.serializeToByteBuffer();
-  }
-
   public String getDeviceId() {
     final InsertNode node = insertNode;
     if (Objects.isNull(node)) {
@@ -399,9 +392,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   public boolean mayEventPathsOverlappedWithPattern() {
     try {
       final InsertNode insertNode = getInsertNode();
-      if (Objects.isNull(insertNode)) {
-        return true;
-      }
 
       if (insertNode instanceof RelationalInsertRowNode
           || insertNode instanceof RelationalInsertTabletNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 0db905a6dc0..065ad3be840 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -51,12 +51,10 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
-  private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
   private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
   private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
 
   private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
-  private final List<String> binaryDataBases = new ArrayList<>();
   private final List<String> insertNodeDataBases = new ArrayList<>();
   private final List<String> tabletDataBases = new ArrayList<>();
 
@@ -90,11 +88,9 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   public synchronized void onSuccess() {
     super.onSuccess();
 
-    binaryBuffers.clear();
     insertNodeBuffers.clear();
     tabletBuffers.clear();
 
-    binaryDataBases.clear();
     insertNodeDataBases.clear();
     tabletDataBases.clear();
     tableModelTabletMap.clear();
@@ -142,12 +138,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     tableModelTabletMap.clear();
 
     return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
-        binaryBuffers,
-        insertNodeBuffers,
-        tabletBuffers,
-        binaryDataBases,
-        insertNodeDataBases,
-        tabletDataBases);
+        insertNodeBuffers, tabletBuffers, insertNodeDataBases, 
tabletDataBases);
   }
 
   public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 4c426dfbaca..8d75e9864bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -96,12 +96,11 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletBatchReq toTPipeTransferReq(
-      final List<ByteBuffer> insertNodeBuffers,
-      final List<ByteBuffer> tabletBuffers)
+      final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> 
tabletBuffers)
       throws IOException {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
 
-    // batchReq.binaryReqs, batchReq.insertNodeReqs, batchReq.tabletReqs are 
empty
+    // batchReq.insertNodeReqs, batchReq.tabletReqs are empty
     // when this method is called from 
PipeTransferTabletBatchReqBuilder.toTPipeTransferReq()
 
     batchReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index c136ffbe7d3..f626d496b55 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -44,8 +44,6 @@ import java.util.Map;
 import java.util.Objects;
 
 public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {
-
-  private final transient List<PipeTransferTabletBinaryReqV2> binaryReqs = new 
ArrayList<>();
   private final transient List<PipeTransferTabletInsertNodeReqV2> 
insertNodeReqs =
       new ArrayList<>();
   private final transient List<PipeTransferTabletRawReqV2> tabletReqs = new 
ArrayList<>();
@@ -66,45 +64,6 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
     final Map<String, List<InsertRowStatement>> 
tableModelDatabaseInsertRowStatementMap =
         new HashMap<>();
 
-    for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
-      final InsertBaseStatement statement = binaryReq.constructStatement();
-      if (statement.isEmpty()) {
-        continue;
-      }
-      if (statement.isWriteToTable()) {
-        if (statement instanceof InsertRowStatement) {
-          tableModelDatabaseInsertRowStatementMap
-              .computeIfAbsent(statement.getDatabaseName().get(), k -> new 
ArrayList<>())
-              .add((InsertRowStatement) statement);
-        } else if (statement instanceof InsertTabletStatement) {
-          statements.add(statement);
-        } else if (statement instanceof InsertRowsStatement) {
-          tableModelDatabaseInsertRowStatementMap
-              .computeIfAbsent(statement.getDatabaseName().get(), k -> new 
ArrayList<>())
-              .addAll(((InsertRowsStatement) 
statement).getInsertRowStatementList());
-        } else {
-          throw new UnsupportedOperationException(
-              String.format(
-                  "unknown InsertBaseStatement %s constructed from 
PipeTransferTabletBinaryReqV2.",
-                  binaryReq));
-        }
-        continue;
-      }
-      if (statement instanceof InsertRowStatement) {
-        insertRowStatementList.add((InsertRowStatement) statement);
-      } else if (statement instanceof InsertTabletStatement) {
-        insertTabletStatementList.add((InsertTabletStatement) statement);
-      } else if (statement instanceof InsertRowsStatement) {
-        insertRowStatementList.addAll(
-            ((InsertRowsStatement) statement).getInsertRowStatementList());
-      } else {
-        throw new UnsupportedOperationException(
-            String.format(
-                "unknown InsertBaseStatement %s constructed from 
PipeTransferTabletBinaryReqV2.",
-                binaryReq));
-      }
-    }
-
     for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq : 
insertNodeReqs) {
       final InsertBaseStatement statement = insertNodeReq.constructStatement();
       if (statement.isEmpty()) {
@@ -180,10 +139,8 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
-      final List<ByteBuffer> binaryBuffers,
       final List<ByteBuffer> insertNodeBuffers,
       final List<ByteBuffer> tabletBuffers,
-      final List<String> binaryDataBases,
       final List<String> insertNodeDataBases,
       final List<String> tabletDataBases)
       throws IOException {
@@ -193,13 +150,8 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
     batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
-      for (int i = 0; i < binaryBuffers.size(); i++) {
-        final ByteBuffer binaryBuffer = binaryBuffers.get(i);
-        ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
-        outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
-        ReadWriteIOUtils.write(binaryDataBases.get(i), outputStream);
-      }
+      // Binary buffer, for rolling upgrade
+      ReadWriteIOUtils.write(0, outputStream);
 
       ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
       for (int i = 0; i < insertNodeBuffers.size(); i++) {
@@ -226,17 +178,10 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
       final org.apache.iotdb.service.rpc.thrift.TPipeTransferReq transferReq) {
     final PipeTransferTabletBatchReqV2 batchReq = new 
PipeTransferTabletBatchReqV2();
 
-    int size = ReadWriteIOUtils.readInt(transferReq.body);
-    for (int i = 0; i < size; ++i) {
-      final int length = ReadWriteIOUtils.readInt(transferReq.body);
-      final byte[] body = new byte[length];
-      transferReq.body.get(body);
-      batchReq.binaryReqs.add(
-          PipeTransferTabletBinaryReqV2.toTPipeTransferBinaryReq(
-              ByteBuffer.wrap(body), 
ReadWriteIOUtils.readString(transferReq.body)));
-    }
+    // Binary req, for rolling upgrade
+    ReadWriteIOUtils.readInt(transferReq.body);
 
-    size = ReadWriteIOUtils.readInt(transferReq.body);
+    int size = ReadWriteIOUtils.readInt(transferReq.body);
     for (int i = 0; i < size; ++i) {
       batchReq.insertNodeReqs.add(
           PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
@@ -258,11 +203,6 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
 
   /////////////////////////////// TestOnly ///////////////////////////////
 
-  @TestOnly
-  public List<PipeTransferTabletBinaryReqV2> getBinaryReqs() {
-    return binaryReqs;
-  }
-
   @TestOnly
   public List<PipeTransferTabletInsertNodeReqV2> getInsertNodeReqs() {
     return insertNodeReqs;
@@ -284,8 +224,7 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
       return false;
     }
     final PipeTransferTabletBatchReqV2 that = (PipeTransferTabletBatchReqV2) 
obj;
-    return Objects.equals(binaryReqs, that.binaryReqs)
-        && Objects.equals(insertNodeReqs, that.insertNodeReqs)
+    return Objects.equals(insertNodeReqs, that.insertNodeReqs)
         && Objects.equals(tabletReqs, that.tabletReqs)
         && version == that.version
         && type == that.type
@@ -294,6 +233,6 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
 
   @Override
   public int hashCode() {
-    return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, 
body);
+    return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 5bee20c4dc0..622f4e4f0cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -234,20 +233,14 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
   private void doTransfer(
       final AirGapSocket socket,
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
-      throws PipeException, WALPipeException, IOException {
+      throws PipeException, IOException {
     final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
     final byte[] bytes =
-        Objects.isNull(insertNode)
-            ? PipeTransferTabletBinaryReqV2.toTPipeTransferBytes(
-                pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
-                pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
-                    ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-                    : null)
-            : PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
-                insertNode,
-                pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
-                    ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-                    : null);
+        PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
+            insertNode,
+            pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
+                ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
+                : null);
 
     if (!send(
         pipeInsertNodeTabletInsertionEvent.getPipeName(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
index 7cdeef9e826..6912290bb2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
@@ -51,7 +51,6 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensu
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2TsFileInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2AsyncBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq;
-import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -70,7 +69,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -324,15 +322,8 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
     final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
     final ProgressIndex progressIndex = 
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
     final TIoTConsensusV2TransferReq iotConsensusV2TransferReq =
-        Objects.isNull(insertNode)
-            ? IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq(
-                pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
-                tCommitId,
-                tConsensusGroupId,
-                progressIndex,
-                thisDataNodeId)
-            : IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
-                insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId);
+        IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
+            insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId);
     final IoTConsensusV2TabletInsertNodeEventHandler 
iotConsensusV2InsertNodeReqHandler =
         new IoTConsensusV2TabletInsertNodeEventHandler(
             pipeInsertNodeTabletInsertionEvent,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
index 5866a5ceeaa..e6fe44b3467 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq;
-import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq;
@@ -326,21 +325,10 @@ public class IoTConsensusV2SyncSink extends IoTDBSink {
       insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
       progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
 
-      if (insertNode != null) {
-        resp =
-            syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer(
-                IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
-                    insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId));
-      } else {
-        resp =
-            syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer(
-                IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq(
-                    pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
-                    tCommitId,
-                    tConsensusGroupId,
-                    progressIndex,
-                    thisDataNodeId));
-      }
+      resp =
+          syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer(
+              IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
+                  insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId));
     } catch (final Exception e) {
       throw new PipeRuntimeSinkRetryTimesConfigurableException(
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
index 4e0cb17f270..20ba2e0552e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
@@ -28,7 +28,6 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBatchReq;
-import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -201,17 +200,10 @@ public abstract class 
IoTConsensusV2TransferBatchReqBuilder implements AutoClose
     final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
     // IoTConsensusV2 will transfer binary data to TIoTConsensusV2TransferReq
     final ProgressIndex progressIndex = 
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
-    if (Objects.isNull(insertNode)) {
-      buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
-      batchReqs.add(
-          IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq(
-              buffer, commitId, consensusGroupId, progressIndex, 
thisDataNodeId));
-    } else {
-      buffer = insertNode.serializeToByteBuffer();
-      batchReqs.add(
-          IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
-              insertNode, commitId, consensusGroupId, progressIndex, 
thisDataNodeId));
-    }
+    buffer = insertNode.serializeToByteBuffer();
+    batchReqs.add(
+        IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
+            insertNode, commitId, consensusGroupId, progressIndex, 
thisDataNodeId));
 
     return buffer.limit();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index f8d0b104096..1a4d7cf3863 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatc
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
@@ -296,10 +295,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               : null;
       final TPipeTransferReq pipeTransferReq =
           compressIfNeeded(
-              Objects.isNull(insertNode)
-                  ? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
-                      pipeInsertNodeTabletInsertionEvent.getByteBuffer(), 
databaseName)
-                  : 
PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, databaseName));
+              PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, 
databaseName));
       final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler =
           new PipeTransferTabletInsertNodeEventHandler(
               pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 016f787afaa..d711c65b9e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlai
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -381,17 +380,11 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
       final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
       final TPipeTransferReq req =
           compressIfNeeded(
-              insertNode != null
-                  ? PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(
-                      insertNode,
-                      pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
-                          ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-                          : null)
-                  : PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
-                      pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
-                      pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
-                          ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
-                          : null));
+              PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(
+                  insertNode,
+                  pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
+                      ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
+                      : null));
       rateLimitIfNeeded(
           pipeInsertNodeTabletInsertionEvent.getPipeName(),
           pipeInsertNodeTabletInsertionEvent.getCreationTime(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 07b88a98053..b5dccb55c88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
 import org.apache.iotdb.db.protocol.session.IClientSession;
@@ -271,16 +270,9 @@ public class WriteBackSink implements PipeConnector {
             : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
 
     final InsertBaseStatement insertBaseStatement;
-    if (Objects.isNull(insertNode)) {
-      insertBaseStatement =
-          PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
-                  pipeInsertNodeTabletInsertionEvent.getByteBuffer(), 
dataBaseName)
-              .constructStatement();
-    } else {
-      insertBaseStatement =
-          PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, 
dataBaseName)
-              .constructStatement();
-    }
+    insertBaseStatement =
+        PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, 
dataBaseName)
+            .constructStatement();
 
     final TSStatus status =
         insertBaseStatement.isWriteToTable()
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 0cc4470882e..5a4c461b473 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -309,7 +309,6 @@ public class PipeDataNodeThriftRequestTest {
 
   @Test
   public void testPipeTransferTabletBatchReq() throws IOException {
-    final List<ByteBuffer> binaryBuffers = new ArrayList<>();
     final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
     final List<ByteBuffer> tabletBuffers = new ArrayList<>();
 
@@ -327,10 +326,6 @@ public class PipeDataNodeThriftRequestTest {
     // InsertNode buffer
     insertNodeBuffers.add(node.serializeToByteBuffer());
 
-    // Binary buffer
-    // Not do real test here since "serializeToWal" needs private inner class 
of walBuffer
-    binaryBuffers.add(ByteBuffer.wrap(new byte[] {'a', 'b'}));
-
     // Raw buffer
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
@@ -367,8 +362,7 @@ public class PipeDataNodeThriftRequestTest {
     }
 
     final PipeTransferTabletBatchReq req =
-        PipeTransferTabletBatchReq.toTPipeTransferReq(
-            binaryBuffers, insertNodeBuffers, tabletBuffers);
+        PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers, 
tabletBuffers);
 
     final PipeTransferTabletBatchReq deserializedReq =
         PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
@@ -455,13 +449,10 @@ public class PipeDataNodeThriftRequestTest {
     final PipeTransferTabletBatchReqV2 deserializedReq =
         PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req);
 
-    Assert.assertArrayEquals(
-        new byte[] {'a', 'b'}, 
deserializedReq.getBinaryReqs().get(0).getByteBuffer().array());
     Assert.assertEquals(node, 
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
     Assert.assertEquals(t, deserializedReq.getTabletReqs().get(0).getTablet());
     Assert.assertTrue(deserializedReq.getTabletReqs().get(0).getIsAligned());
 
-    Assert.assertEquals("test", 
deserializedReq.getBinaryReqs().get(0).getDataBaseName());
     Assert.assertEquals("test", 
deserializedReq.getTabletReqs().get(0).getDataBaseName());
     Assert.assertEquals("test", 
deserializedReq.getInsertNodeReqs().get(0).getDataBaseName());
   }

Reply via email to