This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b0306420a10 Pipe: Fix RPC payload compression in 
PipeTransferTsFileInsertionEventHandler (#12558)
b0306420a10 is described below

commit b0306420a1079f47a6743087773ba1ae4517d610
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 20 22:41:24 2024 +0800

    Pipe: Fix RPC payload compression in 
PipeTransferTsFileInsertionEventHandler (#12558)
---
 .../PipeTransferTsFileInsertionEventHandler.java   | 23 +++++++++++-----------
 .../dataregion/DataRegionWatermarkInjector.java    |  2 +-
 2 files changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 07669de76bf..8d02aeb8fa8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -123,10 +123,13 @@ public class PipeTransferTsFileInsertionEventHandler
       } else if (currentFile == tsFile) {
         isSealSignalSent.set(true);
         client.pipeTransfer(
-            transferMod
-                ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
-                    modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())
-                : 
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()),
+            PipeTransferCompressedReq.toTPipeTransferReq(
+                transferMod
+                    ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+                        modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length())
+                    : PipeTransferTsFileSealReq.toTPipeTransferReq(
+                        tsFile.getName(), tsFile.length()),
+                connector.getCompressors()),
             this);
       }
       return;
@@ -137,13 +140,11 @@ public class PipeTransferTsFileInsertionEventHandler
             ? readBuffer
             : Arrays.copyOfRange(readBuffer, 0, readLength);
     final TPipeTransferReq uncompressedReq =
-        PipeTransferCompressedReq.toTPipeTransferReq(
-            transferMod
-                ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
-                    currentFile.getName(), position, payload)
-                : PipeTransferTsFilePieceReq.toTPipeTransferReq(
-                    currentFile.getName(), position, payload),
-            connector.getCompressors());
+        transferMod
+            ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(
+                currentFile.getName(), position, payload)
+            : PipeTransferTsFilePieceReq.toTPipeTransferReq(
+                currentFile.getName(), position, payload);
     client.pipeTransfer(
         connector.isRpcCompressionEnabled()
             ? PipeTransferCompressedReq.toTPipeTransferReq(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
index 5d8bbf615ae..a8d95d44810 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
@@ -28,7 +28,7 @@ public class DataRegionWatermarkInjector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataRegionWatermarkInjector.class);
 
-  public static final long MIN_INJECTION_INTERVAL_IN_MS = 30 * 1000; // 30s
+  public static final long MIN_INJECTION_INTERVAL_IN_MS = 30 * 1000L; // 30s
 
   private final int regionId;
 

Reply via email to