This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2738e7afe91 Pipe: Fix HistoricalDataRegionTsFileAndDeletionSource
`double-living` parameter failure (#16667)
2738e7afe91 is described below
commit 2738e7afe91cbf020cd0bbfcabb27880c77d473a
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Oct 29 12:21:22 2025 +0800
Pipe: Fix HistoricalDataRegionTsFileAndDeletionSource `double-living`
parameter failure (#16667)
---
.../manual/IoTDBPipeTsFileDecompositionWithModsIT.java | 14 +++++++-------
...ipeHistoricalDataRegionTsFileAndDeletionSource.java | 18 ++++++++++++++----
2 files changed, 21 insertions(+), 11 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
index b939ec79b8b..89f1c3fb440 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -112,7 +112,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTree
.append(",")
.append(3.0f)
.append(")");
- if (i % 100 != 0) {
+ if (i % 50 != 0) {
insertBuilder.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder.toString());
@@ -254,7 +254,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTree
.append(",")
.append(3.0f)
.append(")");
- if (i % 100 != 0) {
+ if (i % 50 != 0) {
insertBuilder.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder.toString());
@@ -369,7 +369,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTree
StringBuilder insertBuilder1 = new StringBuilder(s1);
for (int i = 1; i <= 20000; i++) {
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
- if (i % 1000 != 0) {
+ if (i % 50 != 0) {
insertBuilder1.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder1.toString());
@@ -386,7 +386,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTree
StringBuilder insertBuilder2 = new StringBuilder(s2);
for (int i = 10001; i <= 30000; i++) {
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
- if (i % 1000 != 0) {
+ if (i % 50 != 0) {
insertBuilder2.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder2.toString());
@@ -403,7 +403,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTree
StringBuilder insertBuilder3 = new StringBuilder(s3);
for (int i = 20001; i <= 40000; i++) {
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
- if (i % 1000 != 0) {
+ if (i % 50 != 0) {
insertBuilder3.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder3.toString());
@@ -420,7 +420,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTree
StringBuilder insertBuilder4 = new StringBuilder(s4);
for (int i = 30001; i <= 50000; i++) {
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
- if (i % 1000 != 0) {
+ if (i % 50 != 0) {
insertBuilder4.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder4.toString());
@@ -437,7 +437,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeDualTree
StringBuilder insertBuilder5 = new StringBuilder(s5);
for (int i = 40001; i <= 60000; i++) {
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
- if (i % 1000 != 0) {
+ if (i % 50 != 0) {
insertBuilder5.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder5.toString());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index c5d1734a630..3112c3220fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -373,12 +373,22 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
skipIfNoPrivileges = getSkipIfNoPrivileges(parameters);
- isForwardingPipeRequests =
+ final boolean isDoubleLiving =
parameters.getBooleanOrDefault(
Arrays.asList(
- PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
- PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+ PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+ PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+ if (isDoubleLiving) {
+ isForwardingPipeRequests = false;
+ } else {
+ isForwardingPipeRequests =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ }
if (LOGGER.isInfoEnabled()) {
LOGGER.info(