This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 902aa35e14c792c4c11a38373ffbc2bea03d8ec8
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 7 11:38:57 2025 +0800

    [To dev/1.3] Pipe: Totally banned the receiver conversion (#16086) (#16105)
    
    * Pipe: Totally banned the receiver conversion (#16086)
    
    * logger
    
    * ci-fix
    
    * partial
    
    * Delete client-go
    
    * fix
    
    * rename
    
    * fix-ci
    
    * revert-cp
---
 .../pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java  |  5 ++++-
 .../PipeStatementDataTypeConvertExecutionVisitor.java      |  4 +++-
 .../java/org/apache/iotdb/commons/conf/CommonConfig.java   | 14 ++++++++++++++
 .../org/apache/iotdb/commons/pipe/config/PipeConfig.java   |  5 +++++
 .../apache/iotdb/commons/pipe/config/PipeDescriptor.java   |  5 +++++
 5 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
index fc2b32e145b..c4198e1a9fe 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -342,6 +342,8 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeDualManualIT
         senderSession.executeNonQueryStatement("flush");
       } else {
         // Send Tablet data to receiver
+        // Write once to create data regions, guarantee that no any tsFiles 
will be sent
+        consumer.accept(senderSession, receiverSession, tablet);
         createDataPipe(uuid, false);
         Thread.sleep(2000);
         // The actual implementation logic of inserting data
@@ -390,12 +392,13 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeDualManualIT
     String sql =
         String.format(
             "create pipe test%s"
-                + " with source 
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
+                + " with source 
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
                 + " with processor ('processor'='do-nothing-processor')"
                 + " with sink 
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
             diff,
             isTSFile ? "file" : "forced-log",
             !isTSFile,
+            isTSFile,
             receiverEnv.getIP(),
             receiverEnv.getPort(),
             isTSFile ? "tsfile" : "tablet");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 1788ba02cca..ac87f53cdfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
@@ -87,7 +88,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
       final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
     if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
         // Ignore the error if it is caused by insufficient memory
-        || (status.getMessage() != null && 
status.getMessage().contains("memory"))) {
+        || (status.getMessage() != null && 
status.getMessage().contains("memory"))
+        || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
       return Optional.empty();
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3ac297e1c69..dbe81b6e2ef 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -299,6 +299,8 @@ public class CommonConfig {
   private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
   private double pipeReceiverActualToEstimatedMemoryRatio = 3;
 
+  private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
+  private boolean pipeReceiverLoadConversionEnabled = false;
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; 
// Deprecated
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 
Integer.MAX_VALUE; // Deprecated
   private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
@@ -1569,6 +1571,18 @@ public class CommonConfig {
     return pipeMaxAllowedLinkedTsFileCount;
   }
 
+  public boolean isPipeReceiverLoadConversionEnabled() {
+    return pipeReceiverLoadConversionEnabled;
+  }
+
+  public void setPipeReceiverLoadConversionEnabled(boolean 
pipeReceiverLoadConversionEnabled) {
+    if (this.pipeReceiverLoadConversionEnabled == 
pipeReceiverLoadConversionEnabled) {
+      return;
+    }
+    this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
+    logger.info("pipeReceiverConversionEnabled is set to {}.", 
pipeReceiverLoadConversionEnabled);
+  }
+
   public void setPipeMaxAllowedLinkedTsFileCount(long 
pipeMaxAllowedLinkedTsFileCount) {
     if (this.pipeMaxAllowedLinkedTsFileCount == 
pipeMaxAllowedLinkedTsFileCount) {
       return;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 25fe88d72c9..89f5187eef9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -345,6 +345,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
   }
 
+  public boolean isPipeReceiverLoadConversionEnabled() {
+    return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
+  }
+
   /////////////////////////////// Logger ///////////////////////////////
 
   public double getPipeMetaReportMaxLogNumPerRound() {
@@ -576,6 +580,7 @@ public class PipeConfig {
         "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
         getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
     LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", 
getPipeMaxAllowedLinkedTsFileCount());
+    LOGGER.info("PipeReceiverLoadConversionEnabled: {}", 
isPipeReceiverLoadConversionEnabled());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 9e13bb04ade..31d9f1ec3ce 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -458,6 +458,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_max_allowed_linked_tsfile_count",
                 String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
+    config.setPipeReceiverLoadConversionEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_receiver_load_conversion_enabled",
+                
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
 
     config.setPipeMemoryAllocateMaxRetries(
         Integer.parseInt(

Reply via email to