This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 07117e00854 Pipe: Totally banned the receiver conversion (#16086)
07117e00854 is described below
commit 07117e00854f8c7e20c461d533a6d0249e2addf0
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 5 16:27:14 2025 +0800
Pipe: Totally banned the receiver conversion (#16086)
* logger
* ci-fix
* partial
* Delete client-go
* fix
* rename
* fix-ci
---
.../dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java | 8 +++-----
.../manual/enhanced/IoTDBPipeTypeConversionISessionIT.java | 5 ++++-
.../treemodel/manual/IoTDBPipeTypeConversionISessionIT.java | 5 ++++-
.../PipeTableStatementDataTypeConvertExecutionVisitor.java | 5 +++++
.../PipeTreeStatementDataTypeConvertExecutionVisitor.java | 4 +++-
.../java/org/apache/iotdb/commons/conf/CommonConfig.java | 13 +++++++++++++
.../org/apache/iotdb/commons/pipe/config/PipeConfig.java | 5 +++++
.../apache/iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++++
pom.xml | 1 +
9 files changed, 43 insertions(+), 8 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
index 50e96dc3e28..b714fe79ca7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
@@ -208,15 +208,13 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeTableModelDualManualIT {
}
Set<String> expectedResSet = new java.util.HashSet<>();
-
expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,null,null,null,null,");
-
expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,null,null,null,null,");
-
expectedResSet.add("1970-01-01T00:00:00.002Z,null,null,null,null,d1,d2,blue,2,");
-
expectedResSet.add("1970-01-01T00:00:00.001Z,null,null,null,null,d1,d2,red,1,");
+ expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,");
+ expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,");
// make sure data are not transferred
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from t1",
- "time,tag3,tag4,s3,s4,tag1,tag2,s1,s2,",
+ "time,tag3,tag4,s3,s4,",
expectedResSet,
"db",
handleFailure);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
index f4dfca32c4c..1684ca1f7f5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java
@@ -128,6 +128,8 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelDua
createDataPipe(true);
} else {
// Send Tablet data to receiver
+ // Write once to create data regions, guarantee that no any tsFiles
will be sent
+ executeDataWriteOperation.accept(senderSession, receiverSession,
tablet);
createDataPipe(false);
// The actual implementation logic of inserting data
executeDataWriteOperation.accept(senderSession, receiverSession,
tablet);
@@ -195,10 +197,11 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelDua
String sql =
String.format(
"create pipe test"
- + " with source ('source'='iotdb-source','realtime.mode'='%s')"
+ + " with source
('source'='iotdb-source','realtime.mode'='%s','history.enable'='%s')"
+ " with processor ('processor'='do-nothing-processor')"
+ " with sink
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
isTSFile ? "file" : "forced-log",
+ isTSFile,
receiverEnv.getIP(),
receiverEnv.getPort(),
isTSFile ? "tsfile" : "tablet");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
index 2d89285d72a..cefc3c7df71 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -346,6 +346,8 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeDualTreeModel
senderSession.executeNonQueryStatement("flush");
} else {
// Send Tablet data to receiver
+ // Write once to create data regions, guarantee that no any tsFiles
will be sent
+ consumer.accept(senderSession, receiverSession, tablet);
createDataPipe(uuid, false);
Thread.sleep(2000);
// The actual implementation logic of inserting data
@@ -395,12 +397,13 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeDualTreeModel
String sql =
String.format(
"create pipe test%s"
- + " with source
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
+ + " with source
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
+ " with processor ('processor'='do-nothing-processor')"
+ " with sink
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
diff,
isTSFile ? "file" : "forced-log",
!isTSFile,
+ isTSFile,
receiverEnv.getIP(),
receiverEnv.getPort(),
isTSFile ? "tsfile" : "tablet");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
index 4e7530f05fe..5fb87e550df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
@@ -108,6 +109,10 @@ public class
PipeTableStatementDataTypeConvertExecutionVisitor
return Optional.empty();
}
+ if (!PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
+ return Optional.empty();
+ }
+
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
// Ignore the error if it is caused by insufficient memory
|| (status.getMessage() != null &&
status.getMessage().contains("memory"))) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index 9aa352385ed..77bbf49094c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
@@ -88,7 +89,8 @@ public class PipeTreeStatementDataTypeConvertExecutionVisitor
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
// Ignore the error if it is caused by insufficient memory
- || (status.getMessage() != null &&
status.getMessage().contains("memory"))) {
+ || (status.getMessage() != null &&
status.getMessage().contains("memory"))
+ || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
return Optional.empty();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 345f4680069..d1dcf8d993f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -294,6 +294,7 @@ public class CommonConfig {
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
+ private boolean pipeReceiverLoadConversionEnabled = false;
private double pipeMetaReportMaxLogNumPerRound = 0.1;
private int pipeMetaReportMaxLogIntervalRounds = 360;
@@ -1496,6 +1497,18 @@ public class CommonConfig {
pipeReceiverReqDecompressedMaxLengthInBytes);
}
+ public boolean isPipeReceiverLoadConversionEnabled() {
+ return pipeReceiverLoadConversionEnabled;
+ }
+
+ public void setPipeReceiverLoadConversionEnabled(boolean
pipeReceiverLoadConversionEnabled) {
+ if (this.pipeReceiverLoadConversionEnabled ==
pipeReceiverLoadConversionEnabled) {
+ return;
+ }
+ this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
+ logger.info("pipeReceiverConversionEnabled is set to {}.",
pipeReceiverLoadConversionEnabled);
+ }
+
public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 770e0e959b2..a646ed90dbf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -343,6 +343,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
}
+ public boolean isPipeReceiverLoadConversionEnabled() {
+ return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
+ }
+
/////////////////////////////// Logger ///////////////////////////////
public double getPipeMetaReportMaxLogNumPerRound() {
@@ -573,6 +577,7 @@ public class PipeConfig {
LOGGER.info(
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
getPipeReceiverReqDecompressedMaxLengthInBytes());
+ LOGGER.info("PipeReceiverLoadConversionEnabled: {}",
isPipeReceiverLoadConversionEnabled());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 7e27838618b..7bdfab83efa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -432,6 +432,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_receiver_req_decompressed_max_length_in_bytes",
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
+ config.setPipeReceiverLoadConversionEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_receiver_load_conversion_enabled",
+
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
config.setPipeMemoryAllocateMaxRetries(
Integer.parseInt(
diff --git a/pom.xml b/pom.xml
index d3cb5b56775..ac876a8d925 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@
<module>distribution</module>
<module>example</module>
<module>library-udf</module>
+ <module>integration-test</module>
</modules>
<properties>
<!-- This was the last version to support Java 8 -->