This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f1d8fce46c2 Pipe: Fix PipeSetTTLPlan is not handled correctly (#12571)
f1d8fce46c2 is described below
commit f1d8fce46c2e967d89fc9e07c796a39b7645c80a
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 22 19:00:06 2024 +0800
Pipe: Fix PipeSetTTLPlan is not handled correctly (#12571)
---
.../apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java | 5 ++---
.../manager/pipe/event/PipeConfigRegionSnapshotEvent.java | 1 +
.../manager/pipe/extractor/ConfigRegionListeningFilter.java | 8 ++++++--
.../pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java | 6 +++++-
.../iotdb/confignode/persistence/executor/ConfigPlanExecutor.java | 8 +++++---
.../extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java | 4 ++--
6 files changed, 21 insertions(+), 11 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
index 70316d3b600..858af4cb576 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
@@ -119,8 +119,7 @@ public class IoTDBPipeMetaHistoricalIT extends
AbstractPipeDualManualIT {
final Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor.inclusion", "data, schema");
- extractorAttributes.put(
- "extractor.inclusion.exclusion", "schema.timeseries.ordinary,
schema.ttl");
+ extractorAttributes.put("extractor.inclusion.exclusion",
"schema.timeseries.ordinary");
extractorAttributes.put("extractor.path", "root.ln.**");
connectorAttributes.put("connector", "iotdb-thrift-connector");
@@ -154,7 +153,7 @@ public class IoTDBPipeMetaHistoricalIT extends
AbstractPipeDualManualIT {
"Database,TTL,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
// Receiver's SchemaReplicationFactor/DataReplicationFactor shall be
3/2 regardless of the
// sender
- Collections.singleton("root.ln,null,3,2,604800000,"));
+ Collections.singleton("root.ln,3600000,3,2,604800000,"));
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.**",
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index 407dec4887e..9e7a252eb6e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -78,6 +78,7 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
Arrays.asList(
ConfigPhysicalPlanType.CreateDatabase.getPlanType(),
ConfigPhysicalPlanType.SetTTL.getPlanType(),
+ ConfigPhysicalPlanType.PipeSetTTL.getPlanType(),
ConfigPhysicalPlanType.CreateSchemaTemplate.getPlanType(),
ConfigPhysicalPlanType.CommitSetSchemaTemplate.getPlanType()))));
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index 2f1ee641ba6..f782f3e0617 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -83,7 +83,9 @@ public class ConfigRegionListeningFilter {
Collections.singletonList(ConfigPhysicalPlanType.DropSchemaTemplate));
OPTION_PLAN_MAP.put(
new PartialPath("schema.timeseries.template.unset"),
- Collections.singletonList(ConfigPhysicalPlanType.PipeUnsetTemplate));
+ Collections.unmodifiableList(
+ Arrays.asList(
+ ConfigPhysicalPlanType.UnsetTemplate,
ConfigPhysicalPlanType.PipeUnsetTemplate)));
OPTION_PLAN_MAP.put(
new PartialPath("schema.timeseries.ordinary.drop"),
@@ -96,7 +98,9 @@ public class ConfigRegionListeningFilter {
Collections.singletonList(ConfigPhysicalPlanType.PipeDeactivateTemplate));
OPTION_PLAN_MAP.put(
- new PartialPath("schema.ttl"),
Collections.singletonList(ConfigPhysicalPlanType.SetTTL));
+ new PartialPath("schema.ttl"),
+ Collections.unmodifiableList(
+ Arrays.asList(ConfigPhysicalPlanType.SetTTL,
ConfigPhysicalPlanType.PipeSetTTL)));
OPTION_PLAN_MAP.put(
new PartialPath("auth.role.create"),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
index e5ea0cccd3b..93424df8416 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -269,8 +270,11 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitTTL(
final SetTTLPlan setTTLPlan, final IoTDBPipePattern pattern) {
+ final PartialPath databasePath = new
PartialPath(setTTLPlan.getDatabasePathPattern());
final List<PartialPath> intersectionList =
- pattern.getIntersection(new
PartialPath(setTTLPlan.getDatabasePathPattern()));
+ pattern.matchPrefixPath(databasePath.getFullPath())
+ ? Collections.singletonList(databasePath)
+ : pattern.getIntersection(databasePath);
return !intersectionList.isEmpty()
? Optional.of(
new PipeSetTTLPlan(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index f4763406565..aa65375290a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -507,9 +507,11 @@ public class ConfigPlanExecutor {
// Will not be actually executed.
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
case PipeUnsetTemplate:
- // PipeUnsetTemplate plan will not be written here, and exists only
after pipe sender
- // collects UnsetTemplatePlan and before receiver calls ConfigManager.
- throw new UnsupportedOperationException("PipeUnsetTemplate is not
supported.");
+ case PipeSetTTL:
+ // PipeUnsetTemplate/PipeSetTTL plan will not be written here, and
exists only after pipe
+ // sender collects UnsetTemplatePlan/SetTTLPlan and before receiver
calls ConfigManager.
+ throw new UnsupportedOperationException(
+ String.format("Plan type %s is not supported.",
physicalPlan.getType()));
case TestOnly:
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
default:
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
index a2ff81a9da4..5b341d11da8 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
@@ -391,13 +391,13 @@ public class
PipeConfigPhysicalPlanPatternParseVisitorTest {
((PipeSetTTLPlan)
IoTDBConfigRegionExtractor.PATTERN_PARSE_VISITOR
.visitTTL(
- new SetTTLPlan(Arrays.asList("root", "*", "device",
"s1"), Long.MAX_VALUE),
+ new SetTTLPlan(Arrays.asList("root", "db", "**"),
Long.MAX_VALUE),
prefixPathPattern)
.orElseThrow(AssertionError::new))
.getSetTTLPlans();
Assert.assertEquals(
- Collections.singletonList(new PartialPath("root.db.device.s1")),
+ Collections.singletonList(new PartialPath("root.db.device.**")),
plans.stream()
.map(setTTLPlan -> new
PartialPath(setTTLPlan.getDatabasePathPattern()))
.collect(Collectors.toList()));