This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/2.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/2.0.1 by this push:
new 307cd5878b7 Pipe: Add parameter check for forwarding-pipe-requests
307cd5878b7 is described below
commit 307cd5878b737c54265cbf929636424b6d4663b1
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Feb 12 19:04:51 2025 +0800
Pipe: Add parameter check for forwarding-pipe-requests
---
.../pipe/it/autocreate/IoTDBPipeAutoConflictIT.java | 3 +++
.../iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java | 4 ++++
.../iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java | 2 ++
.../iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java | 2 ++
.../pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java | 3 +++
.../iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java | 3 +++
.../pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java | 2 ++
.../pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java | 3 +++
.../iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java | 2 ++
.../iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java | 1 +
.../extractor/dataregion/IoTDBDataRegionExtractor.java | 16 ++++++++++++++++
11 files changed, 41 insertions(+)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
index 65eb2458df7..c75edf4254b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -76,6 +77,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualAutoIT {
receiverEnv.initClusterEnvironment();
}
+ @Ignore
@Test
public void testDoubleLivingAutoConflict() throws Exception {
// Double living is two clusters each with a pipe connecting to the other.
@@ -225,6 +227,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualAutoIT {
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
}
+ @Ignore
@Test
public void testDoubleLivingAutoConflictTemplate() throws Exception {
final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
index 49f7b1fd3f3..d4f5ecfcea1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -43,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+@Ignore
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeIdempotentIT extends AbstractPipeDualAutoIT {
@@ -370,6 +372,7 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualAutoIT {
Collections.singleton("1,"));
}
+ @Ignore
@Test
public void testDropUserIdempotent() throws Exception {
testIdempotent(
@@ -381,6 +384,7 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualAutoIT {
Collections.singleton("1,"));
}
+ @Ignore
@Test
public void testDropRoleIdempotent() throws Exception {
testIdempotent(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
index f151463b396..38c7e3e7145 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeLifeCycleIT.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -620,6 +621,7 @@ public class IoTDBPipeLifeCycleIT extends
AbstractPipeDualAutoIT {
}
}
+ @Ignore
@Test
public void testDoubleLiving() throws Exception {
// Double living is two clusters with pipes connecting each other.
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
index dbb18109228..631ed60f4d3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeManualConflictIT.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -39,6 +40,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+@Ignore
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2ManualCreateSchema.class})
public class IoTDBPipeManualConflictIT extends AbstractPipeDualManualIT {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
index c03f56bad4e..2074a7f4345 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaLeaderChangeIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -66,6 +67,7 @@ public class IoTDBPipeMetaLeaderChangeIT extends
AbstractPipeDualManualIT {
receiverEnv.initClusterEnvironment();
}
+ @Ignore
@Test
public void testConfigNodeLeaderChange() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -125,6 +127,7 @@ public class IoTDBPipeMetaLeaderChangeIT extends
AbstractPipeDualManualIT {
receiverEnv, "count databases", "count,",
Collections.singleton("20,"));
}
+ @Ignore
@Test
public void testSchemaRegionLeaderChange() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
index 76cd4a90e8b..915ee4a86a1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaRestartIT.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -40,6 +41,7 @@ import java.util.Map;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2ManualCreateSchema.class})
public class IoTDBPipeMetaRestartIT extends AbstractPipeDualManualIT {
+ @Ignore
@Test
public void testAutoRestartSchemaTask() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -106,6 +108,7 @@ public class IoTDBPipeMetaRestartIT extends
AbstractPipeDualManualIT {
receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton("20,"));
}
+ @Ignore
@Test
public void testAutoRestartConfigTask() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
index 769b940c78d..231d4dc4e5a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMultiSchemaRegionIT.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -41,6 +42,7 @@ import java.util.Map;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2ManualCreateSchema.class})
public class IoTDBPipeMultiSchemaRegionIT extends AbstractPipeDualManualIT {
+ @Ignore
@Test
public void testMultiSchemaRegion() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
index 525be694815..30736f4a8cb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -76,6 +77,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeTableModelTestIT {
receiverEnv.initClusterEnvironment();
}
+ @Ignore
@Test
public void testDoubleLivingAutoConflict() throws Exception {
// Double living is two clusters each with a pipe connecting to the other.
@@ -173,6 +175,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeTableModelTestIT {
TableModelUtils.assertData("test", "test1", 0, 600, receiverEnv,
handleFailure);
}
+ @Ignore
@Test
public void testDoubleLivingAutoConflictTemplate() throws Exception {
final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
index 1faf4a4c65c..e2f2aebb6f3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.write.record.Tablet;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -239,6 +240,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeTableModelTestIT {
}
}
+ @Ignore
@Test
public void testWriteBackSink() throws Exception {
try (final SyncConfigNodeIServiceClient client =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
index 88cfc1bffd7..a2a8e1f37ec 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java
@@ -575,6 +575,7 @@ public class IoTDBPipeLifeCycleIT extends
AbstractPipeTableModelTestIT {
}
}
+ @Ignore
@Test
public void testDoubleLiving() throws Exception {
boolean insertResult = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index f2c1e553bdd..1320f90c088 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -140,6 +141,21 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
public void validate(final PipeParameterValidator validator) throws
Exception {
super.validate(validator);
+ final boolean forwardingPipeRequests =
+ validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(
+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ if (!forwardingPipeRequests) {
+ throw new PipeParameterNotValidException(
+ String.format(
+ "The parameter %s cannot be set to false.",
+ PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY));
+ }
+
// Validate whether the pipe needs to extract table model data or tree
model data
final boolean isTreeDialect =
validator