This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2738e7afe91 Pipe: Fix HistoricalDataRegionTsFileAndDeletionSource 
`double-living` parameter failure (#16667)
2738e7afe91 is described below

commit 2738e7afe91cbf020cd0bbfcabb27880c77d473a
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Oct 29 12:21:22 2025 +0800

    Pipe: Fix HistoricalDataRegionTsFileAndDeletionSource `double-living` 
parameter failure (#16667)
---
 .../manual/IoTDBPipeTsFileDecompositionWithModsIT.java | 14 +++++++-------
 ...ipeHistoricalDataRegionTsFileAndDeletionSource.java | 18 ++++++++++++++----
 2 files changed, 21 insertions(+), 11 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
index b939ec79b8b..89f1c3fb440 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -112,7 +112,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualTree
           .append(",")
           .append(3.0f)
           .append(")");
-      if (i % 100 != 0) {
+      if (i % 50 != 0) {
         insertBuilder.append(",");
       } else {
         TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder.toString());
@@ -254,7 +254,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualTree
           .append(",")
           .append(3.0f)
           .append(")");
-      if (i % 100 != 0) {
+      if (i % 50 != 0) {
         insertBuilder.append(",");
       } else {
         TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder.toString());
@@ -369,7 +369,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualTree
     StringBuilder insertBuilder1 = new StringBuilder(s1);
     for (int i = 1; i <= 20000; i++) {
       
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
-      if (i % 1000 != 0) {
+      if (i % 50 != 0) {
         insertBuilder1.append(",");
       } else {
         TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder1.toString());
@@ -386,7 +386,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualTree
     StringBuilder insertBuilder2 = new StringBuilder(s2);
     for (int i = 10001; i <= 30000; i++) {
       
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
-      if (i % 1000 != 0) {
+      if (i % 50 != 0) {
         insertBuilder2.append(",");
       } else {
         TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder2.toString());
@@ -403,7 +403,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualTree
     StringBuilder insertBuilder3 = new StringBuilder(s3);
     for (int i = 20001; i <= 40000; i++) {
       
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
-      if (i % 1000 != 0) {
+      if (i % 50 != 0) {
         insertBuilder3.append(",");
       } else {
         TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder3.toString());
@@ -420,7 +420,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualTree
     StringBuilder insertBuilder4 = new StringBuilder(s4);
     for (int i = 30001; i <= 50000; i++) {
       
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
-      if (i % 1000 != 0) {
+      if (i % 50 != 0) {
         insertBuilder4.append(",");
       } else {
         TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder4.toString());
@@ -437,7 +437,7 @@ public class IoTDBPipeTsFileDecompositionWithModsIT extends 
AbstractPipeDualTree
     StringBuilder insertBuilder5 = new StringBuilder(s5);
     for (int i = 40001; i <= 60000; i++) {
       
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
-      if (i % 1000 != 0) {
+      if (i % 50 != 0) {
         insertBuilder5.append(",");
       } else {
         TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder5.toString());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index c5d1734a630..3112c3220fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -373,12 +373,22 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
 
     skipIfNoPrivileges = getSkipIfNoPrivileges(parameters);
 
-    isForwardingPipeRequests =
+    final boolean isDoubleLiving =
         parameters.getBooleanOrDefault(
             Arrays.asList(
-                PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
-                PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-            
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+                PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
+                PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
+            PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
+    if (isDoubleLiving) {
+      isForwardingPipeRequests = false;
+    } else {
+      isForwardingPipeRequests =
+          parameters.getBooleanOrDefault(
+              Arrays.asList(
+                  PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+                  PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+              
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+    }
 
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info(

Reply via email to