This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 902aa35e14c792c4c11a38373ffbc2bea03d8ec8 Author: Caideyipi <[email protected]> AuthorDate: Thu Aug 7 11:38:57 2025 +0800 [To dev/1.3] Pipe: Totally banned the receiver conversion (#16086) (#16105) * Pipe: Totally banned the receiver conversion (#16086) * logger * ci-fix * partial * Delete client-go * fix * rename * fix-ci * revert-cp --- .../pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java | 5 ++++- .../PipeStatementDataTypeConvertExecutionVisitor.java | 4 +++- .../java/org/apache/iotdb/commons/conf/CommonConfig.java | 14 ++++++++++++++ .../org/apache/iotdb/commons/pipe/config/PipeConfig.java | 5 +++++ .../apache/iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++++ 5 files changed, 31 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java index fc2b32e145b..c4198e1a9fe 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java @@ -342,6 +342,8 @@ public class IoTDBPipeTypeConversionISessionIT extends AbstractPipeDualManualIT senderSession.executeNonQueryStatement("flush"); } else { // Send Tablet data to receiver + // Write once to create data regions, guarantee that no any tsFiles will be sent + consumer.accept(senderSession, receiverSession, tablet); createDataPipe(uuid, false); Thread.sleep(2000); // The actual implementation logic of inserting data @@ -390,12 +392,13 @@ public class IoTDBPipeTypeConversionISessionIT extends AbstractPipeDualManualIT String sql = String.format( "create pipe test%s" - + " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')" + + " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')" + " with processor ('processor'='do-nothing-processor')" + " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')", diff, isTSFile ? "file" : "forced-log", !isTSFile, + isTSFile, receiverEnv.getIP(), receiverEnv.getPort(), isTSFile ? "tsfile" : "tablet"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java index 1788ba02cca..ac87f53cdfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; @@ -87,7 +88,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode() // Ignore the error if it is caused by insufficient memory - || (status.getMessage() != null && status.getMessage().contains("memory"))) { + || (status.getMessage() != null && status.getMessage().contains("memory")) + || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) { return Optional.empty(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 3ac297e1c69..dbe81b6e2ef 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -299,6 +299,8 @@ public class CommonConfig { private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000; private double pipeReceiverActualToEstimatedMemoryRatio = 3; + private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB + private boolean pipeReceiverLoadConversionEnabled = false; private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; // Deprecated private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = Integer.MAX_VALUE; // Deprecated private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated @@ -1569,6 +1571,18 @@ public class CommonConfig { return pipeMaxAllowedLinkedTsFileCount; } + public boolean isPipeReceiverLoadConversionEnabled() { + return pipeReceiverLoadConversionEnabled; + } + + public void setPipeReceiverLoadConversionEnabled(boolean pipeReceiverLoadConversionEnabled) { + if (this.pipeReceiverLoadConversionEnabled == pipeReceiverLoadConversionEnabled) { + return; + } + this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled; + logger.info("pipeReceiverConversionEnabled is set to {}.", pipeReceiverLoadConversionEnabled); + } + public void setPipeMaxAllowedLinkedTsFileCount(long pipeMaxAllowedLinkedTsFileCount) { if (this.pipeMaxAllowedLinkedTsFileCount == pipeMaxAllowedLinkedTsFileCount) { return; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 25fe88d72c9..89f5187eef9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -345,6 +345,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount(); } + public boolean isPipeReceiverLoadConversionEnabled() { + return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled(); + } + /////////////////////////////// Logger /////////////////////////////// public double getPipeMetaReportMaxLogNumPerRound() { @@ -576,6 +580,7 @@ public class PipeConfig { "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}", getPipeMaxAllowedPendingTsFileEpochPerDataRegion()); LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", getPipeMaxAllowedLinkedTsFileCount()); + LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled()); LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 9e13bb04ade..31d9f1ec3ce 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -458,6 +458,11 @@ public class PipeDescriptor { properties.getProperty( "pipe_max_allowed_linked_tsfile_count", String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount())))); + config.setPipeReceiverLoadConversionEnabled( + Boolean.parseBoolean( + properties.getProperty( + "pipe_receiver_load_conversion_enabled", + String.valueOf(config.isPipeReceiverLoadConversionEnabled())))); config.setPipeMemoryAllocateMaxRetries( Integer.parseInt(
