This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-fix-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-fix-fix by this push:
new bb87e11a538 fix
bb87e11a538 is described below
commit bb87e11a538d9256a81aac75c558b6f2e6cd2e86
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 23 19:12:24 2026 +0800
fix
---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 16 +----
.../evolvable/batch/PipeTabletEventPlainBatch.java | 11 +---
.../request/PipeTransferTabletBatchReq.java | 5 +-
.../request/PipeTransferTabletBatchReqV2.java | 75 ++--------------------
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 19 ++----
.../iotconsensusv2/IoTConsensusV2AsyncSink.java | 13 +---
.../iotconsensusv2/IoTConsensusV2SyncSink.java | 20 ++----
.../IoTConsensusV2TransferBatchReqBuilder.java | 16 ++---
.../thrift/async/IoTDBDataRegionAsyncSink.java | 6 +-
.../thrift/sync/IoTDBDataRegionSyncSink.java | 17 ++---
.../sink/protocol/writeback/WriteBackSink.java | 14 +---
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 11 +---
12 files changed, 39 insertions(+), 184 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index b576ac507b8..1cb50747479 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -53,7 +53,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
-import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.collector.TabletCollector;
@@ -69,7 +68,8 @@ import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -159,18 +159,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
this.allocatedMemoryBlock = new AtomicReference<>();
}
+ @Nonnull
public InsertNode getInsertNode() {
return insertNode;
}
- public ByteBuffer getByteBuffer() throws WALPipeException {
- final InsertNode node = insertNode;
- if (Objects.isNull(node)) {
- throw new PipeException("InsertNode has been released");
- }
- return node.serializeToByteBuffer();
- }
-
public String getDeviceId() {
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
@@ -399,9 +392,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
public boolean mayEventPathsOverlappedWithPattern() {
try {
final InsertNode insertNode = getInsertNode();
- if (Objects.isNull(insertNode)) {
- return true;
- }
if (insertNode instanceof RelationalInsertRowNode
|| insertNode instanceof RelationalInsertTabletNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 0db905a6dc0..065ad3be840 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -51,12 +51,10 @@ import java.util.concurrent.atomic.AtomicLong;
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
- private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
- private final List<String> binaryDataBases = new ArrayList<>();
private final List<String> insertNodeDataBases = new ArrayList<>();
private final List<String> tabletDataBases = new ArrayList<>();
@@ -90,11 +88,9 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
public synchronized void onSuccess() {
super.onSuccess();
- binaryBuffers.clear();
insertNodeBuffers.clear();
tabletBuffers.clear();
- binaryDataBases.clear();
insertNodeDataBases.clear();
tabletDataBases.clear();
tableModelTabletMap.clear();
@@ -142,12 +138,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
tableModelTabletMap.clear();
return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
- binaryBuffers,
- insertNodeBuffers,
- tabletBuffers,
- binaryDataBases,
- insertNodeDataBases,
- tabletDataBases);
+ insertNodeBuffers, tabletBuffers, insertNodeDataBases,
tabletDataBases);
}
public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 4c426dfbaca..8d75e9864bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -96,12 +96,11 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletBatchReq toTPipeTransferReq(
- final List<ByteBuffer> insertNodeBuffers,
- final List<ByteBuffer> tabletBuffers)
+ final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer>
tabletBuffers)
throws IOException {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
- // batchReq.binaryReqs, batchReq.insertNodeReqs, batchReq.tabletReqs are
empty
+ // batchReq.insertNodeReqs, batchReq.tabletReqs are empty
// when this method is called from
PipeTransferTabletBatchReqBuilder.toTPipeTransferReq()
batchReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index c136ffbe7d3..f626d496b55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -44,8 +44,6 @@ import java.util.Map;
import java.util.Objects;
public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {
-
- private final transient List<PipeTransferTabletBinaryReqV2> binaryReqs = new
ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReqV2>
insertNodeReqs =
new ArrayList<>();
private final transient List<PipeTransferTabletRawReqV2> tabletReqs = new
ArrayList<>();
@@ -66,45 +64,6 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
final Map<String, List<InsertRowStatement>>
tableModelDatabaseInsertRowStatementMap =
new HashMap<>();
- for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
- final InsertBaseStatement statement = binaryReq.constructStatement();
- if (statement.isEmpty()) {
- continue;
- }
- if (statement.isWriteToTable()) {
- if (statement instanceof InsertRowStatement) {
- tableModelDatabaseInsertRowStatementMap
- .computeIfAbsent(statement.getDatabaseName().get(), k -> new
ArrayList<>())
- .add((InsertRowStatement) statement);
- } else if (statement instanceof InsertTabletStatement) {
- statements.add(statement);
- } else if (statement instanceof InsertRowsStatement) {
- tableModelDatabaseInsertRowStatementMap
- .computeIfAbsent(statement.getDatabaseName().get(), k -> new
ArrayList<>())
- .addAll(((InsertRowsStatement)
statement).getInsertRowStatementList());
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "unknown InsertBaseStatement %s constructed from
PipeTransferTabletBinaryReqV2.",
- binaryReq));
- }
- continue;
- }
- if (statement instanceof InsertRowStatement) {
- insertRowStatementList.add((InsertRowStatement) statement);
- } else if (statement instanceof InsertTabletStatement) {
- insertTabletStatementList.add((InsertTabletStatement) statement);
- } else if (statement instanceof InsertRowsStatement) {
- insertRowStatementList.addAll(
- ((InsertRowsStatement) statement).getInsertRowStatementList());
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "unknown InsertBaseStatement %s constructed from
PipeTransferTabletBinaryReqV2.",
- binaryReq));
- }
- }
-
for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq :
insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
@@ -180,10 +139,8 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
- final List<ByteBuffer> binaryBuffers,
final List<ByteBuffer> insertNodeBuffers,
final List<ByteBuffer> tabletBuffers,
- final List<String> binaryDataBases,
final List<String> insertNodeDataBases,
final List<String> tabletDataBases)
throws IOException {
@@ -193,13 +150,8 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
- for (int i = 0; i < binaryBuffers.size(); i++) {
- final ByteBuffer binaryBuffer = binaryBuffers.get(i);
- ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
- outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
- ReadWriteIOUtils.write(binaryDataBases.get(i), outputStream);
- }
+ // Binary buffer, for rolling upgrade
+ ReadWriteIOUtils.write(0, outputStream);
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
for (int i = 0; i < insertNodeBuffers.size(); i++) {
@@ -226,17 +178,10 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
final org.apache.iotdb.service.rpc.thrift.TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReqV2 batchReq = new
PipeTransferTabletBatchReqV2();
- int size = ReadWriteIOUtils.readInt(transferReq.body);
- for (int i = 0; i < size; ++i) {
- final int length = ReadWriteIOUtils.readInt(transferReq.body);
- final byte[] body = new byte[length];
- transferReq.body.get(body);
- batchReq.binaryReqs.add(
- PipeTransferTabletBinaryReqV2.toTPipeTransferBinaryReq(
- ByteBuffer.wrap(body),
ReadWriteIOUtils.readString(transferReq.body)));
- }
+ // Binary req, for rolling upgrade
+ ReadWriteIOUtils.readInt(transferReq.body);
- size = ReadWriteIOUtils.readInt(transferReq.body);
+ int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
@@ -258,11 +203,6 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
/////////////////////////////// TestOnly ///////////////////////////////
- @TestOnly
- public List<PipeTransferTabletBinaryReqV2> getBinaryReqs() {
- return binaryReqs;
- }
-
@TestOnly
public List<PipeTransferTabletInsertNodeReqV2> getInsertNodeReqs() {
return insertNodeReqs;
@@ -284,8 +224,7 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
return false;
}
final PipeTransferTabletBatchReqV2 that = (PipeTransferTabletBatchReqV2)
obj;
- return Objects.equals(binaryReqs, that.binaryReqs)
- && Objects.equals(insertNodeReqs, that.insertNodeReqs)
+ return Objects.equals(insertNodeReqs, that.insertNodeReqs)
&& Objects.equals(tabletReqs, that.tabletReqs)
&& version == that.version
&& type == that.type
@@ -294,6 +233,6 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
@Override
public int hashCode() {
- return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type,
body);
+ return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 5bee20c4dc0..622f4e4f0cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -31,7 +31,6 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -234,20 +233,14 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
private void doTransfer(
final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
- throws PipeException, WALPipeException, IOException {
+ throws PipeException, IOException {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
final byte[] bytes =
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReqV2.toTPipeTransferBytes(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
- pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
- ?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : null)
- : PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
- insertNode,
- pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
- ?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : null);
+ PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
+ insertNode,
+ pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
+ ?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
+ : null);
if (!send(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
index 7cdeef9e826..6912290bb2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
@@ -51,7 +51,6 @@ import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensu
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler.IoTConsensusV2TsFileInsertionEventHandler;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2AsyncBatchReqBuilder;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq;
-import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -70,7 +69,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
-import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
@@ -324,15 +322,8 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
final ProgressIndex progressIndex =
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
final TIoTConsensusV2TransferReq iotConsensusV2TransferReq =
- Objects.isNull(insertNode)
- ? IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
- tCommitId,
- tConsensusGroupId,
- progressIndex,
- thisDataNodeId)
- : IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
- insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId);
+ IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
+ insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId);
final IoTConsensusV2TabletInsertNodeEventHandler
iotConsensusV2InsertNodeReqHandler =
new IoTConsensusV2TabletInsertNodeEventHandler(
pipeInsertNodeTabletInsertionEvent,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
index 5866a5ceeaa..e6fe44b3467 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq;
-import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq;
@@ -326,21 +325,10 @@ public class IoTConsensusV2SyncSink extends IoTDBSink {
insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
- if (insertNode != null) {
- resp =
- syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer(
- IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
- insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId));
- } else {
- resp =
- syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer(
- IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
- tCommitId,
- tConsensusGroupId,
- progressIndex,
- thisDataNodeId));
- }
+ resp =
+ syncIoTConsensusV2ServiceClient.iotConsensusV2Transfer(
+ IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
+ insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId));
} catch (final Exception e) {
throw new PipeRuntimeSinkRetryTimesConfigurableException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
index 4e0cb17f270..20ba2e0552e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
@@ -28,7 +28,6 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBatchReq;
-import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -201,17 +200,10 @@ public abstract class
IoTConsensusV2TransferBatchReqBuilder implements AutoClose
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
// IoTConsensusV2 will transfer binary data to TIoTConsensusV2TransferReq
final ProgressIndex progressIndex =
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
- if (Objects.isNull(insertNode)) {
- buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
- batchReqs.add(
- IoTConsensusV2TabletBinaryReq.toTIoTConsensusV2TransferReq(
- buffer, commitId, consensusGroupId, progressIndex,
thisDataNodeId));
- } else {
- buffer = insertNode.serializeToByteBuffer();
- batchReqs.add(
- IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
- insertNode, commitId, consensusGroupId, progressIndex,
thisDataNodeId));
- }
+ buffer = insertNode.serializeToByteBuffer();
+ batchReqs.add(
+ IoTConsensusV2TabletInsertNodeReq.toTIoTConsensusV2TransferReq(
+ insertNode, commitId, consensusGroupId, progressIndex,
thisDataNodeId));
return buffer.limit();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index f8d0b104096..1a4d7cf3863 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatc
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
@@ -296,10 +295,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
: null;
final TPipeTransferReq pipeTransferReq =
compressIfNeeded(
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
databaseName)
- :
PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode, databaseName));
+ PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode,
databaseName));
final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler =
new PipeTransferTabletInsertNodeEventHandler(
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 016f787afaa..d711c65b9e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlai
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -381,17 +380,11 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
final TPipeTransferReq req =
compressIfNeeded(
- insertNode != null
- ? PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(
- insertNode,
- pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
- ?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : null)
- : PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
- pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
- ?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : null));
+ PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(
+ insertNode,
+ pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
+ ?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
+ : null));
rateLimitIfNeeded(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
pipeInsertNodeTabletInsertionEvent.getCreationTime(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 07b88a98053..b5dccb55c88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
import org.apache.iotdb.db.protocol.session.IClientSession;
@@ -271,16 +270,9 @@ public class WriteBackSink implements PipeConnector {
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;
final InsertBaseStatement insertBaseStatement;
- if (Objects.isNull(insertNode)) {
- insertBaseStatement =
- PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
dataBaseName)
- .constructStatement();
- } else {
- insertBaseStatement =
- PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode,
dataBaseName)
- .constructStatement();
- }
+ insertBaseStatement =
+ PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode,
dataBaseName)
+ .constructStatement();
final TSStatus status =
insertBaseStatement.isWriteToTable()
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 0cc4470882e..5a4c461b473 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -309,7 +309,6 @@ public class PipeDataNodeThriftRequestTest {
@Test
public void testPipeTransferTabletBatchReq() throws IOException {
- final List<ByteBuffer> binaryBuffers = new ArrayList<>();
final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
final List<ByteBuffer> tabletBuffers = new ArrayList<>();
@@ -327,10 +326,6 @@ public class PipeDataNodeThriftRequestTest {
// InsertNode buffer
insertNodeBuffers.add(node.serializeToByteBuffer());
- // Binary buffer
- // Not do real test here since "serializeToWal" needs private inner class
of walBuffer
- binaryBuffers.add(ByteBuffer.wrap(new byte[] {'a', 'b'}));
-
// Raw buffer
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
@@ -367,8 +362,7 @@ public class PipeDataNodeThriftRequestTest {
}
final PipeTransferTabletBatchReq req =
- PipeTransferTabletBatchReq.toTPipeTransferReq(
- binaryBuffers, insertNodeBuffers, tabletBuffers);
+ PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers,
tabletBuffers);
final PipeTransferTabletBatchReq deserializedReq =
PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
@@ -455,13 +449,10 @@ public class PipeDataNodeThriftRequestTest {
final PipeTransferTabletBatchReqV2 deserializedReq =
PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req);
- Assert.assertArrayEquals(
- new byte[] {'a', 'b'},
deserializedReq.getBinaryReqs().get(0).getByteBuffer().array());
Assert.assertEquals(node,
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
Assert.assertEquals(t, deserializedReq.getTabletReqs().get(0).getTablet());
Assert.assertTrue(deserializedReq.getTabletReqs().get(0).getIsAligned());
- Assert.assertEquals("test",
deserializedReq.getBinaryReqs().get(0).getDataBaseName());
Assert.assertEquals("test",
deserializedReq.getTabletReqs().get(0).getDataBaseName());
Assert.assertEquals("test",
deserializedReq.getInsertNodeReqs().get(0).getDataBaseName());
}