This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e7af492621034b74d80365d953099efe1e6282b0
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 5 16:27:14 2025 +0800

    Pipe: Totally banned the receiver conversion (#16086)
    
    * logger
    
    * ci-fix
    
    * partial
    
    * Delete client-go
    
    * fix
    
    * rename
    
    * fix-ci
    
    (cherry picked from commit 07117e00854f8c7e20c461d533a6d0249e2addf0)
---
 .../dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java   |  8 +++-----
 .../manual/enhanced/IoTDBPipeTypeConversionISessionIT.java  |  5 ++++-
 .../treemodel/manual/IoTDBPipeTypeConversionISessionIT.java |  5 ++++-
 .../PipeTableStatementDataTypeConvertExecutionVisitor.java  |  5 +++++
 .../PipeTreeStatementDataTypeConvertExecutionVisitor.java   |  4 +++-
 .../java/org/apache/iotdb/commons/conf/CommonConfig.java    | 13 +++++++++++++
 .../org/apache/iotdb/commons/pipe/config/PipeConfig.java    |  5 +++++
 .../apache/iotdb/commons/pipe/config/PipeDescriptor.java    |  5 +++++
 pom.xml                                                     |  1 +
 9 files changed, 43 insertions(+), 8 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
index bc4e7f02053..f9db634ded2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
@@ -204,15 +204,13 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeTableModelDualManualIT {
       }
 
       Set<String> expectedResSet = new java.util.HashSet<>();
-      
expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,null,null,null,null,");
-      
expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,null,null,null,null,");
-      
expectedResSet.add("1970-01-01T00:00:00.002Z,null,null,null,null,d1,d2,blue,2,");
-      
expectedResSet.add("1970-01-01T00:00:00.001Z,null,null,null,null,d1,d2,red,1,");
+      expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,");
+      expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,");
       // make sure data are not transferred
       TestUtils.assertDataEventuallyOnEnv(
           receiverEnv,
           "select * from t1",
-          "time,tag3,tag4,s3,s4,tag1,tag2,s1,s2,",
+          "time,tag3,tag4,s3,s4,",
           expectedResSet,
           "db",
           handleFailure);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
index 5ffa8fadfbf..3544e13e99a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
@@ -128,6 +128,8 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelDua
         createDataPipe(true);
       } else {
         // Send Tablet data to receiver
+        // Write once to create data regions, guarantee that no any tsFiles 
will be sent
+        executeDataWriteOperation.accept(senderSession, receiverSession, 
tablet);
         createDataPipe(false);
         // The actual implementation logic of inserting data
         executeDataWriteOperation.accept(senderSession, receiverSession, 
tablet);
@@ -194,10 +196,11 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelDua
     String sql =
         String.format(
             "create pipe test"
-                + " with source ('source'='iotdb-source','realtime.mode'='%s')"
+                + " with source 
('source'='iotdb-source','realtime.mode'='%s','history.enable'='%s')"
                 + " with processor ('processor'='do-nothing-processor')"
                 + " with sink 
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
             isTSFile ? "file" : "forced-log",
+            isTSFile,
             receiverEnv.getIP(),
             receiverEnv.getPort(),
             isTSFile ? "tsfile" : "tablet");
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
index 27fb9f8c81b..699deb70335 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -346,6 +346,8 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeDualTreeModel
         senderSession.executeNonQueryStatement("flush");
       } else {
         // Send Tablet data to receiver
+        // Write once to create data regions, guarantee that no any tsFiles 
will be sent
+        consumer.accept(senderSession, receiverSession, tablet);
         createDataPipe(uuid, false);
         Thread.sleep(2000);
         // The actual implementation logic of inserting data
@@ -394,12 +396,13 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeDualTreeModel
     String sql =
         String.format(
             "create pipe test%s"
-                + " with source 
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
+                + " with source 
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
                 + " with processor ('processor'='do-nothing-processor')"
                 + " with sink 
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
             diff,
             isTSFile ? "file" : "forced-log",
             !isTSFile,
+            isTSFile,
             receiverEnv.getIP(),
             receiverEnv.getPort(),
             isTSFile ? "tsfile" : "tablet");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
index 4e7530f05fe..5fb87e550df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
@@ -108,6 +109,10 @@ public class 
PipeTableStatementDataTypeConvertExecutionVisitor
       return Optional.empty();
     }
 
+    if (!PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
+      return Optional.empty();
+    }
+
     if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
         // Ignore the error if it is caused by insufficient memory
         || (status.getMessage() != null && 
status.getMessage().contains("memory"))) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index 9aa352385ed..77bbf49094c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
@@ -88,7 +89,8 @@ public class PipeTreeStatementDataTypeConvertExecutionVisitor
       final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
     if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
         // Ignore the error if it is caused by insufficient memory
-        || (status.getMessage() != null && 
status.getMessage().contains("memory"))) {
+        || (status.getMessage() != null && 
status.getMessage().contains("memory"))
+        || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
       return Optional.empty();
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 604e65e153b..648cfeaad68 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -294,6 +294,7 @@ public class CommonConfig {
   private double pipeReceiverActualToEstimatedMemoryRatio = 3;
 
   private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
+  private boolean pipeReceiverLoadConversionEnabled = false;
 
   private double pipeMetaReportMaxLogNumPerRound = 0.1;
   private int pipeMetaReportMaxLogIntervalRounds = 360;
@@ -1491,6 +1492,18 @@ public class CommonConfig {
         pipeReceiverReqDecompressedMaxLengthInBytes);
   }
 
+  public boolean isPipeReceiverLoadConversionEnabled() {
+    return pipeReceiverLoadConversionEnabled;
+  }
+
+  public void setPipeReceiverLoadConversionEnabled(boolean 
pipeReceiverLoadConversionEnabled) {
+    if (this.pipeReceiverLoadConversionEnabled == 
pipeReceiverLoadConversionEnabled) {
+      return;
+    }
+    this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
+    logger.info("pipeReceiverConversionEnabled is set to {}.", 
pipeReceiverLoadConversionEnabled);
+  }
+
   public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
     return pipeReceiverReqDecompressedMaxLengthInBytes;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 770e0e959b2..a646ed90dbf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -343,6 +343,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
   }
 
+  public boolean isPipeReceiverLoadConversionEnabled() {
+    return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
+  }
+
   /////////////////////////////// Logger ///////////////////////////////
 
   public double getPipeMetaReportMaxLogNumPerRound() {
@@ -573,6 +577,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeReceiverReqDecompressedMaxLengthInBytes: {}",
         getPipeReceiverReqDecompressedMaxLengthInBytes());
+    LOGGER.info("PipeReceiverLoadConversionEnabled: {}", 
isPipeReceiverLoadConversionEnabled());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 7e27838618b..7bdfab83efa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -432,6 +432,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_receiver_req_decompressed_max_length_in_bytes",
                 
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
+    config.setPipeReceiverLoadConversionEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_receiver_load_conversion_enabled",
+                
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
 
     config.setPipeMemoryAllocateMaxRetries(
         Integer.parseInt(
diff --git a/pom.xml b/pom.xml
index 9eaecdb3a23..0da94ccaf91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@
         <module>distribution</module>
         <module>example</module>
         <module>library-udf</module>
+        <module>integration-test</module>
     </modules>
     <properties>
         <!-- This was the last version to support Java 8 -->

Reply via email to