This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b8d829cce3d [To dev/1.3] Pipe: Totally banned the receiver conversion
(#16086) (#16105)
b8d829cce3d is described below
commit b8d829cce3d2b3915aa80f460a6c408928c6b177
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 7 11:38:57 2025 +0800
[To dev/1.3] Pipe: Totally banned the receiver conversion (#16086) (#16105)
* Pipe: Totally banned the receiver conversion (#16086)
* logger
* ci-fix
* partial
* Delete client-go
* fix
* rename
* fix-ci
* revert-cp
---
.../pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java | 5 ++++-
.../PipeStatementDataTypeConvertExecutionVisitor.java | 4 +++-
.../java/org/apache/iotdb/commons/conf/CommonConfig.java | 13 +++++++++++++
.../org/apache/iotdb/commons/pipe/config/PipeConfig.java | 5 +++++
.../apache/iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++++
5 files changed, 30 insertions(+), 2 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
index fc2b32e145b..c4198e1a9fe 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -342,6 +342,8 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeDualManualIT
senderSession.executeNonQueryStatement("flush");
} else {
// Send Tablet data to receiver
+ // Write once to create data regions, guarantee that no any tsFiles
will be sent
+ consumer.accept(senderSession, receiverSession, tablet);
createDataPipe(uuid, false);
Thread.sleep(2000);
// The actual implementation logic of inserting data
@@ -390,12 +392,13 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeDualManualIT
String sql =
String.format(
"create pipe test%s"
- + " with source
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
+ + " with source
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
+ " with processor ('processor'='do-nothing-processor')"
+ " with sink
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
diff,
isTSFile ? "file" : "forced-log",
!isTSFile,
+ isTSFile,
receiverEnv.getIP(),
receiverEnv.getPort(),
isTSFile ? "tsfile" : "tablet");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index e795c5187cf..caa4a399f67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
@@ -87,7 +88,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
// Ignore the error if it is caused by insufficient memory
- || (status.getMessage() != null &&
status.getMessage().contains("memory"))) {
+ || (status.getMessage() != null &&
status.getMessage().contains("memory"))
+ || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
return Optional.empty();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index f0e0bb52e46..117ee6c23a4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -298,6 +298,7 @@ public class CommonConfig {
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
+ private boolean pipeReceiverLoadConversionEnabled = false;
private double pipeMetaReportMaxLogNumPerRound = 0.1;
private int pipeMetaReportMaxLogIntervalRounds = 360;
@@ -1503,6 +1504,18 @@ public class CommonConfig {
pipeReceiverReqDecompressedMaxLengthInBytes);
}
+ public boolean isPipeReceiverLoadConversionEnabled() {
+ return pipeReceiverLoadConversionEnabled;
+ }
+
+ public void setPipeReceiverLoadConversionEnabled(boolean
pipeReceiverLoadConversionEnabled) {
+ if (this.pipeReceiverLoadConversionEnabled ==
pipeReceiverLoadConversionEnabled) {
+ return;
+ }
+ this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
+ logger.info("pipeReceiverConversionEnabled is set to {}.",
pipeReceiverLoadConversionEnabled);
+ }
+
public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index dd540cc5f27..39f93fea5af 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -343,6 +343,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
}
+ public boolean isPipeReceiverLoadConversionEnabled() {
+ return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
+ }
+
/////////////////////////////// Logger ///////////////////////////////
public double getPipeMetaReportMaxLogNumPerRound() {
@@ -574,6 +578,7 @@ public class PipeConfig {
LOGGER.info(
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
getPipeReceiverReqDecompressedMaxLengthInBytes());
+ LOGGER.info("PipeReceiverLoadConversionEnabled: {}",
isPipeReceiverLoadConversionEnabled());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 7e27838618b..7bdfab83efa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -432,6 +432,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_receiver_req_decompressed_max_length_in_bytes",
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
+ config.setPipeReceiverLoadConversionEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_receiver_load_conversion_enabled",
+
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
config.setPipeMemoryAllocateMaxRetries(
Integer.parseInt(