This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-compressor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c048086717253ab7b918ee6aaa52ce81ba4f3358 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 20 20:00:03 2024 +0800 Update PipeTransferTsFileInsertionEventHandler.java --- .../PipeTransferTsFileInsertionEventHandler.java | 23 +++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java index 07669de76bf..8d02aeb8fa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java @@ -123,10 +123,13 @@ public class PipeTransferTsFileInsertionEventHandler } else if (currentFile == tsFile) { isSealSignalSent.set(true); client.pipeTransfer( - transferMod - ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq( - modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()) - : PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()), + PipeTransferCompressedReq.toTPipeTransferReq( + transferMod + ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq( + modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()) + : PipeTransferTsFileSealReq.toTPipeTransferReq( + tsFile.getName(), tsFile.length()), + connector.getCompressors()), this); } return; @@ -137,13 +140,11 @@ public class PipeTransferTsFileInsertionEventHandler ? readBuffer : Arrays.copyOfRange(readBuffer, 0, readLength); final TPipeTransferReq uncompressedReq = - PipeTransferCompressedReq.toTPipeTransferReq( - transferMod - ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq( - currentFile.getName(), position, payload) - : PipeTransferTsFilePieceReq.toTPipeTransferReq( - currentFile.getName(), position, payload), - connector.getCompressors()); + transferMod + ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq( + currentFile.getName(), position, payload) + : PipeTransferTsFilePieceReq.toTPipeTransferReq( + currentFile.getName(), position, payload); client.pipeTransfer( connector.isRpcCompressionEnabled() ? PipeTransferCompressedReq.toTPipeTransferReq(
