This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b0306420a10 Pipe: Fix RPC payload compression in
PipeTransferTsFileInsertionEventHandler (#12558)
b0306420a10 is described below
commit b0306420a1079f47a6743087773ba1ae4517d610
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 20 22:41:24 2024 +0800
Pipe: Fix RPC payload compression in
PipeTransferTsFileInsertionEventHandler (#12558)
---
.../PipeTransferTsFileInsertionEventHandler.java | 23 +++++++++++-----------
.../dataregion/DataRegionWatermarkInjector.java | 2 +-
2 files changed, 13 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 07669de76bf..8d02aeb8fa8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -123,10 +123,13 @@ public class PipeTransferTsFileInsertionEventHandler
} else if (currentFile == tsFile) {
isSealSignalSent.set(true);
client.pipeTransfer(
- transferMod
- ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())
- :
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()),
+ PipeTransferCompressedReq.toTPipeTransferReq(
+ transferMod
+ ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())
+ : PipeTransferTsFileSealReq.toTPipeTransferReq(
+ tsFile.getName(), tsFile.length()),
+ connector.getCompressors()),
this);
}
return;
@@ -137,13 +140,11 @@ public class PipeTransferTsFileInsertionEventHandler
? readBuffer
: Arrays.copyOfRange(readBuffer, 0, readLength);
final TPipeTransferReq uncompressedReq =
- PipeTransferCompressedReq.toTPipeTransferReq(
- transferMod
- ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
- currentFile.getName(), position, payload)
- : PipeTransferTsFilePieceReq.toTPipeTransferReq(
- currentFile.getName(), position, payload),
- connector.getCompressors());
+ transferMod
+ ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
+ currentFile.getName(), position, payload)
+ : PipeTransferTsFilePieceReq.toTPipeTransferReq(
+ currentFile.getName(), position, payload);
client.pipeTransfer(
connector.isRpcCompressionEnabled()
? PipeTransferCompressedReq.toTPipeTransferReq(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
index 5d8bbf615ae..a8d95d44810 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
@@ -28,7 +28,7 @@ public class DataRegionWatermarkInjector {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataRegionWatermarkInjector.class);
- public static final long MIN_INJECTION_INTERVAL_IN_MS = 30 * 1000; // 30s
+ public static final long MIN_INJECTION_INTERVAL_IN_MS = 30 * 1000L; // 30s
private final int regionId;