This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 044620e7f0b [To dev/1.3] Pipe: Fixed the log of disruptor queue &
deleted the useless binary buffer (#17341) (#17359)
044620e7f0b is described below
commit 044620e7f0b0893d27ad279986840bc43b15ad19
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 20:05:36 2026 +0800
[To dev/1.3] Pipe: Fixed the log of disruptor queue & deleted the useless
binary buffer (#17341) (#17359)
* wz
* fx
---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 22 ++-------
.../evolvable/batch/PipeTabletEventPlainBatch.java | 31 ++-----------
.../request/PipeTransferTabletBatchReq.java | 54 ++++------------------
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 9 +---
.../pipeconsensus/PipeConsensusAsyncSink.java | 13 +-----
.../pipeconsensus/PipeConsensusSyncSink.java | 20 ++------
.../PipeConsensusTransferBatchReqBuilder.java | 16 ++-----
.../thrift/async/IoTDBDataRegionAsyncSink.java | 7 +--
.../thrift/sync/IoTDBDataRegionSyncSink.java | 7 +--
.../sink/protocol/writeback/WriteBackSink.java | 21 ++-------
.../realtime/assigner/DisruptorQueue.java | 4 +-
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 3 +-
12 files changed, 37 insertions(+), 170 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index ade4d67b2dd..bc7040a0598 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -37,7 +37,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.collector.TabletCollector;
@@ -51,7 +50,8 @@ import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -102,18 +102,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
this.allocatedMemoryBlock = new AtomicReference<>();
}
+ @Nonnull
public InsertNode getInsertNode() {
return insertNode;
}
- public ByteBuffer getByteBuffer() throws WALPipeException {
- final InsertNode node = insertNode;
- if (Objects.isNull(node)) {
- throw new PipeException("InsertNode has been released");
- }
- return node.serializeToByteBuffer();
- }
-
public String getDeviceId() {
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
@@ -214,9 +207,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
public boolean mayEventTimeOverlappedWithTimeRange() {
try {
final InsertNode insertNode = getInsertNode();
- if (Objects.isNull(insertNode)) {
- return true;
- }
if (insertNode instanceof InsertRowNode) {
final long timestamp = ((InsertRowNode) insertNode).getTime();
@@ -258,9 +248,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
public boolean mayEventPathsOverlappedWithPattern() {
try {
final InsertNode insertNode = getInsertNode();
- if (Objects.isNull(insertNode)) {
- return true;
- }
if (insertNode instanceof InsertRowNode || insertNode instanceof
InsertTabletNode) {
final PartialPath devicePartialPath = insertNode.getDevicePath();
@@ -355,9 +342,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
dataContainers = new ArrayList<>();
final InsertNode node = getInsertNode();
- if (Objects.isNull(node)) {
- throw new PipeException("InsertNode has been released");
- }
switch (node.getType()) {
case INSERT_ROW:
case INSERT_TABLET:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index d450be8200c..837d4db639a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -24,14 +24,11 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -40,23 +37,14 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventBatch.class);
-
- private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
// Used to rate limit when transferring data
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new
HashMap<>();
- PipeTabletEventPlainBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
- super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
- }
-
PipeTabletEventPlainBatch(
final int maxDelayInMs,
final long requestMaxBatchSizeInBytes,
@@ -65,8 +53,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
}
@Override
- protected boolean constructBatch(final TabletInsertionEvent event)
- throws WALPipeException, IOException {
+ protected boolean constructBatch(final TabletInsertionEvent event) throws
IOException {
final int bufferSize = buildTabletInsertionBuffer(event);
totalBufferSize += bufferSize;
pipe2BytesAccumulated.compute(
@@ -81,7 +68,6 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
public synchronized void onSuccess() {
super.onSuccess();
- binaryBuffers.clear();
insertNodeBuffers.clear();
tabletBuffers.clear();
@@ -89,8 +75,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
}
public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
- return PipeTransferTabletBatchReq.toTPipeTransferReq(
- binaryBuffers, insertNodeBuffers, tabletBuffers);
+ return PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers,
tabletBuffers);
}
public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
@@ -101,8 +86,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
return pipe2BytesAccumulated;
}
- private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
- throws IOException, WALPipeException {
+ private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException {
final ByteBuffer buffer;
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
@@ -110,13 +94,8 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
// Read the bytebuffer from the wal file and transfer it directly
without serializing or
// deserializing if possible
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
- if (Objects.isNull(insertNode)) {
- buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
- binaryBuffers.add(buffer);
- } else {
- buffer = insertNode.serializeToByteBuffer();
- insertNodeBuffers.add(buffer);
- }
+ buffer = insertNode.serializeToByteBuffer();
+ insertNodeBuffers.add(buffer);
} else {
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) event;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 48bd1016763..94a838ee0ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -45,7 +45,6 @@ import java.util.Objects;
public class PipeTransferTabletBatchReq extends TPipeTransferReq {
- private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new
ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs
= new ArrayList<>();
private final transient List<PipeTransferTabletRawReq> tabletReqs = new
ArrayList<>();
@@ -61,26 +60,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new
ArrayList<>();
- for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
- final InsertBaseStatement statement = binaryReq.constructStatement();
- if (statement.isEmpty()) {
- continue;
- }
- if (statement instanceof InsertRowStatement) {
- insertRowStatementList.add((InsertRowStatement) statement);
- } else if (statement instanceof InsertTabletStatement) {
- insertTabletStatementList.add((InsertTabletStatement) statement);
- } else if (statement instanceof InsertRowsStatement) {
- insertRowStatementList.addAll(
- ((InsertRowsStatement) statement).getInsertRowStatementList());
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "unknown InsertBaseStatement %s constructed from
PipeTransferTabletBinaryReq.",
- binaryReq));
- }
- }
-
for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs)
{
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
@@ -117,9 +96,7 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletBatchReq toTPipeTransferReq(
- final List<ByteBuffer> binaryBuffers,
- final List<ByteBuffer> insertNodeBuffers,
- final List<ByteBuffer> tabletBuffers)
+ final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer>
tabletBuffers)
throws IOException {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
@@ -130,11 +107,8 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
- for (final ByteBuffer binaryBuffer : binaryBuffers) {
- ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
- outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
- }
+ // Binary req, for rolling upgrading
+ ReadWriteIOUtils.write(0, outputStream);
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
@@ -157,16 +131,10 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
final TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
- int size = ReadWriteIOUtils.readInt(transferReq.body);
- for (int i = 0; i < size; ++i) {
- final int length = ReadWriteIOUtils.readInt(transferReq.body);
- final byte[] body = new byte[length];
- transferReq.body.get(body);
- batchReq.binaryReqs.add(
-
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
- }
+ // Binary req, for rolling upgrading
+ ReadWriteIOUtils.readInt(transferReq.body);
- size = ReadWriteIOUtils.readInt(transferReq.body);
+ int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
@@ -188,11 +156,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
/////////////////////////////// TestOnly ///////////////////////////////
- @TestOnly
- public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
- return binaryReqs;
- }
-
@TestOnly
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
return insertNodeReqs;
@@ -214,8 +177,7 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
return false;
}
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
- return binaryReqs.equals(that.binaryReqs)
- && insertNodeReqs.equals(that.insertNodeReqs)
+ return insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
@@ -224,6 +186,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
@Override
public int hashCode() {
- return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type,
body);
+ return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index a2f09cf5e50..918cf24fce5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -194,13 +193,9 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
private void doTransfer(
final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
- throws PipeException, WALPipeException, IOException {
+ throws PipeException, IOException {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
- final byte[] bytes =
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer())
- : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
+ final byte[] bytes =
PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
if (!send(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
index a34781dee2e..47380ec799d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
@@ -48,7 +48,6 @@ import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensu
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
-import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -64,7 +63,6 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
-import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
@@ -280,15 +278,8 @@ public class PipeConsensusAsyncSink extends IoTDBSink
implements ConsensusPipeSi
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
final ProgressIndex progressIndex =
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
final TPipeConsensusTransferReq pipeConsensusTransferReq =
- Objects.isNull(insertNode)
- ? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
- tCommitId,
- tConsensusGroupId,
- progressIndex,
- thisDataNodeId)
- : PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
- insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId);
+ PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+ insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId);
final PipeConsensusTabletInsertNodeEventHandler
pipeConsensusInsertNodeReqHandler =
new PipeConsensusTabletInsertNodeEventHandler(
pipeInsertNodeTabletInsertionEvent,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index b059e484734..3d992932e9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -38,7 +38,6 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.builder.PipeConsensusSyncBatchReqBuilder;
-import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
@@ -255,21 +254,10 @@ public class PipeConsensusSyncSink extends IoTDBSink {
insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
- if (insertNode != null) {
- resp =
- syncPipeConsensusServiceClient.pipeConsensusTransfer(
- PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
- insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId));
- } else {
- resp =
- syncPipeConsensusServiceClient.pipeConsensusTransfer(
- PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
- tCommitId,
- tConsensusGroupId,
- progressIndex,
- thisDataNodeId));
- }
+ resp =
+ syncPipeConsensusServiceClient.pipeConsensusTransfer(
+ PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+ insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId));
} catch (final Exception e) {
throw new PipeConnectionException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index eb5ce6fe48c..e13e1c1a802 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -28,7 +28,6 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBatchReq;
-import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -199,17 +198,10 @@ public abstract class
PipeConsensusTransferBatchReqBuilder implements AutoClosea
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
// PipeConsensus will transfer binary data to TPipeConsensusTransferReq
final ProgressIndex progressIndex =
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
- if (Objects.isNull(insertNode)) {
- buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
- batchReqs.add(
- PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
- buffer, commitId, consensusGroupId, progressIndex,
thisDataNodeId));
- } else {
- buffer = insertNode.serializeToByteBuffer();
- batchReqs.add(
- PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
- insertNode, commitId, consensusGroupId, progressIndex,
thisDataNodeId));
- }
+ buffer = insertNode.serializeToByteBuffer();
+ batchReqs.add(
+ PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+ insertNode, commitId, consensusGroupId, progressIndex,
thisDataNodeId));
return buffer.limit();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index f8e0684188a..6dc9a695b90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -39,7 +39,6 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatc
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
@@ -281,11 +280,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
final TPipeTransferReq pipeTransferReq =
- compressIfNeeded(
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer())
- :
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
+
compressIfNeeded(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler =
new PipeTransferTabletInsertNodeEventHandler(
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index a40074392f4..a13f40b1b83 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -39,7 +39,6 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatc
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -319,11 +318,7 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
final TPipeTransferReq req =
- compressIfNeeded(
- insertNode != null
- ?
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)
- : PipeTransferTabletBinaryReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
+
compressIfNeeded(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
rateLimitIfNeeded(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
pipeInsertNodeTabletInsertionEvent.getCreationTime(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 80c9e09f9ac..21976f63ab6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -22,12 +22,10 @@ package org.apache.iotdb.db.pipe.sink.protocol.writeback;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -53,7 +51,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
-import java.util.Objects;
public class WriteBackSink implements PipeConnector {
@@ -125,23 +122,13 @@ public class WriteBackSink implements PipeConnector {
private void doTransfer(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
- throws PipeException, WALPipeException {
+ throws PipeException {
final TSStatus status;
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
- if (Objects.isNull(insertNode)) {
- status =
- PipeDataNodeAgent.receiver()
- .thrift()
- .receive(
- PipeTransferTabletBinaryReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer()))
- .getStatus();
- } else {
- final InsertBaseStatement statement =
-
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
- status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS :
executeStatement(statement);
- }
+ final InsertBaseStatement statement =
+
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
+ status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS :
executeStatement(statement);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index c7add196e69..2a2fa110749 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -112,9 +112,9 @@ public class DisruptorQueue {
private void mayPrintExceedingLog() {
final long remainingCapacity = ringBuffer.remainingCapacity();
final long bufferSize = ringBuffer.getBufferSize();
- if ((double) remainingCapacity / bufferSize >= 0.5
+ if ((double) remainingCapacity / bufferSize <= 0.5
&& System.currentTimeMillis()
- -
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()
+ -
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds() * 1000L
>= lastLogTime) {
LOGGER.warn(
"The assigner queue content has exceeded half, it may be stuck and
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 9b7e38f629e..c69b4f53a46 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -253,8 +253,7 @@ public class PipeDataNodeThriftRequestTest {
}
final PipeTransferTabletBatchReq req =
- PipeTransferTabletBatchReq.toTPipeTransferReq(
- binaryBuffers, insertNodeBuffers, tabletBuffers);
+ PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers,
tabletBuffers);
final PipeTransferTabletBatchReq deserializedReq =
PipeTransferTabletBatchReq.fromTPipeTransferReq(req);