This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/2.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/2.0.1 by this push:
     new 307cd5878b7 Pipe: Add parameter check for forwarding-pipe-requests
307cd5878b7 is described below

commit 307cd5878b737c54265cbf929636424b6d4663b1
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Feb 12 19:04:51 2025 +0800

    Pipe: Add parameter check for forwarding-pipe-requests
---
 .../pipe/it/autocreate/IoTDBPipeAutoConflictIT.java      |  3 +++
 .../iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java  |  4 ++++
 .../iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java   |  2 ++
 .../iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java  |  2 ++
 .../pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java      |  3 +++
 .../iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java     |  3 +++
 .../pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java     |  2 ++
 .../pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java      |  3 +++
 .../iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java    |  2 ++
 .../iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java   |  1 +
 .../extractor/dataregion/IoTDBDataRegionExtractor.java   | 16 ++++++++++++++++
 11 files changed, 41 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
index 65eb2458df7..c75edf4254b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -76,6 +77,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualAutoIT {
     receiverEnv.initClusterEnvironment();
   }
 
+  @Ignore
   @Test
   public void testDoubleLivingAutoConflict() throws Exception {
     // Double living is two clusters each with a pipe connecting to the other.
@@ -225,6 +227,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualAutoIT {
         receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
   }
 
+  @Ignore
   @Test
   public void testDoubleLivingAutoConflictTemplate() throws Exception {
     final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
index 49f7b1fd3f3..d4f5ecfcea1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+@Ignore
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeIdempotentIT extends AbstractPipeDualAutoIT {
@@ -370,6 +372,7 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualAutoIT {
         Collections.singleton("1,"));
   }
 
+  @Ignore
   @Test
   public void testDropUserIdempotent() throws Exception {
     testIdempotent(
@@ -381,6 +384,7 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualAutoIT {
         Collections.singleton("1,"));
   }
 
+  @Ignore
   @Test
   public void testDropRoleIdempotent() throws Exception {
     testIdempotent(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
index f151463b396..38c7e3e7145 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -620,6 +621,7 @@ public class IoTDBPipeLifeCycleIT extends 
AbstractPipeDualAutoIT {
     }
   }
 
+  @Ignore
   @Test
   public void testDoubleLiving() throws Exception {
     // Double living is two clusters with pipes connecting each other.
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
index dbb18109228..631ed60f4d3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -39,6 +40,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
+@Ignore
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2ManualCreateSchema.class})
 public class IoTDBPipeManualConflictIT extends AbstractPipeDualManualIT {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
index c03f56bad4e..2074a7f4345 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -66,6 +67,7 @@ public class IoTDBPipeMetaLeaderChangeIT extends 
AbstractPipeDualManualIT {
     receiverEnv.initClusterEnvironment();
   }
 
+  @Ignore
   @Test
   public void testConfigNodeLeaderChange() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -125,6 +127,7 @@ public class IoTDBPipeMetaLeaderChangeIT extends 
AbstractPipeDualManualIT {
         receiverEnv, "count databases", "count,", 
Collections.singleton("20,"));
   }
 
+  @Ignore
   @Test
   public void testSchemaRegionLeaderChange() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
index 76cd4a90e8b..915ee4a86a1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -40,6 +41,7 @@ import java.util.Map;
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2ManualCreateSchema.class})
 public class IoTDBPipeMetaRestartIT extends AbstractPipeDualManualIT {
+  @Ignore
   @Test
   public void testAutoRestartSchemaTask() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -106,6 +108,7 @@ public class IoTDBPipeMetaRestartIT extends 
AbstractPipeDualManualIT {
         receiverEnv, "count timeseries", "count(timeseries),", 
Collections.singleton("20,"));
   }
 
+  @Ignore
   @Test
   public void testAutoRestartConfigTask() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
index 769b940c78d..231d4dc4e5a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -41,6 +42,7 @@ import java.util.Map;
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2ManualCreateSchema.class})
 public class IoTDBPipeMultiSchemaRegionIT extends AbstractPipeDualManualIT {
+  @Ignore
   @Test
   public void testMultiSchemaRegion() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
index 525be694815..30736f4a8cb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -76,6 +77,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeTableModelTestIT {
     receiverEnv.initClusterEnvironment();
   }
 
+  @Ignore
   @Test
   public void testDoubleLivingAutoConflict() throws Exception {
     // Double living is two clusters each with a pipe connecting to the other.
@@ -173,6 +175,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeTableModelTestIT {
     TableModelUtils.assertData("test", "test1", 0, 600, receiverEnv, 
handleFailure);
   }
 
+  @Ignore
   @Test
   public void testDoubleLivingAutoConflictTemplate() throws Exception {
     final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
index 1faf4a4c65c..e2f2aebb6f3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.write.record.Tablet;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -239,6 +240,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeTableModelTestIT {
     }
   }
 
+  @Ignore
   @Test
   public void testWriteBackSink() throws Exception {
     try (final SyncConfigNodeIServiceClient client =
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
index 88cfc1bffd7..a2a8e1f37ec 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
@@ -575,6 +575,7 @@ public class IoTDBPipeLifeCycleIT extends 
AbstractPipeTableModelTestIT {
     }
   }
 
+  @Ignore
   @Test
   public void testDoubleLiving() throws Exception {
     boolean insertResult = true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index f2c1e553bdd..1320f90c088 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
@@ -140,6 +141,21 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     super.validate(validator);
 
+    final boolean forwardingPipeRequests =
+        validator
+            .getParameters()
+            .getBooleanOrDefault(
+                Arrays.asList(
+                    
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+                    PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+                
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+    if (!forwardingPipeRequests) {
+      throw new PipeParameterNotValidException(
+          String.format(
+              "The parameter %s cannot be set to false.",
+              PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY));
+    }
+
     // Validate whether the pipe needs to extract table model data or tree 
model data
     final boolean isTreeDialect =
         validator

Reply via email to