This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f1d8fce46c2 Pipe: Fix PipeSetTTLPlan is not handled correctly (#12571)
f1d8fce46c2 is described below

commit f1d8fce46c2e967d89fc9e07c796a39b7645c80a
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 22 19:00:06 2024 +0800

    Pipe: Fix PipeSetTTLPlan is not handled correctly (#12571)
---
 .../apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java    | 5 ++---
 .../manager/pipe/event/PipeConfigRegionSnapshotEvent.java         | 1 +
 .../manager/pipe/extractor/ConfigRegionListeningFilter.java       | 8 ++++++--
 .../pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java | 6 +++++-
 .../iotdb/confignode/persistence/executor/ConfigPlanExecutor.java | 8 +++++---
 .../extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java  | 4 ++--
 6 files changed, 21 insertions(+), 11 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
index 70316d3b600..858af4cb576 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
@@ -119,8 +119,7 @@ public class IoTDBPipeMetaHistoricalIT extends 
AbstractPipeDualManualIT {
       final Map<String, String> connectorAttributes = new HashMap<>();
 
       extractorAttributes.put("extractor.inclusion", "data, schema");
-      extractorAttributes.put(
-          "extractor.inclusion.exclusion", "schema.timeseries.ordinary, 
schema.ttl");
+      extractorAttributes.put("extractor.inclusion.exclusion", 
"schema.timeseries.ordinary");
       extractorAttributes.put("extractor.path", "root.ln.**");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
@@ -154,7 +153,7 @@ public class IoTDBPipeMetaHistoricalIT extends 
AbstractPipeDualManualIT {
           
"Database,TTL,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
           // Receiver's SchemaReplicationFactor/DataReplicationFactor shall be 
3/2 regardless of the
           // sender
-          Collections.singleton("root.ln,null,3,2,604800000,"));
+          Collections.singleton("root.ln,3600000,3,2,604800000,"));
       TestUtils.assertDataEventuallyOnEnv(
           receiverEnv,
           "select * from root.**",
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index 407dec4887e..9e7a252eb6e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -78,6 +78,7 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
                 Arrays.asList(
                     ConfigPhysicalPlanType.CreateDatabase.getPlanType(),
                     ConfigPhysicalPlanType.SetTTL.getPlanType(),
+                    ConfigPhysicalPlanType.PipeSetTTL.getPlanType(),
                     ConfigPhysicalPlanType.CreateSchemaTemplate.getPlanType(),
                     
ConfigPhysicalPlanType.CommitSetSchemaTemplate.getPlanType()))));
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index 2f1ee641ba6..f782f3e0617 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -83,7 +83,9 @@ public class ConfigRegionListeningFilter {
           
Collections.singletonList(ConfigPhysicalPlanType.DropSchemaTemplate));
       OPTION_PLAN_MAP.put(
           new PartialPath("schema.timeseries.template.unset"),
-          Collections.singletonList(ConfigPhysicalPlanType.PipeUnsetTemplate));
+          Collections.unmodifiableList(
+              Arrays.asList(
+                  ConfigPhysicalPlanType.UnsetTemplate, 
ConfigPhysicalPlanType.PipeUnsetTemplate)));
 
       OPTION_PLAN_MAP.put(
           new PartialPath("schema.timeseries.ordinary.drop"),
@@ -96,7 +98,9 @@ public class ConfigRegionListeningFilter {
           
Collections.singletonList(ConfigPhysicalPlanType.PipeDeactivateTemplate));
 
       OPTION_PLAN_MAP.put(
-          new PartialPath("schema.ttl"), 
Collections.singletonList(ConfigPhysicalPlanType.SetTTL));
+          new PartialPath("schema.ttl"),
+          Collections.unmodifiableList(
+              Arrays.asList(ConfigPhysicalPlanType.SetTTL, 
ConfigPhysicalPlanType.PipeSetTTL)));
 
       OPTION_PLAN_MAP.put(
           new PartialPath("auth.role.create"),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
index e5ea0cccd3b..93424df8416 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitor.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -269,8 +270,11 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
   @Override
   public Optional<ConfigPhysicalPlan> visitTTL(
       final SetTTLPlan setTTLPlan, final IoTDBPipePattern pattern) {
+    final PartialPath databasePath = new 
PartialPath(setTTLPlan.getDatabasePathPattern());
     final List<PartialPath> intersectionList =
-        pattern.getIntersection(new 
PartialPath(setTTLPlan.getDatabasePathPattern()));
+        pattern.matchPrefixPath(databasePath.getFullPath())
+            ? Collections.singletonList(databasePath)
+            : pattern.getIntersection(databasePath);
     return !intersectionList.isEmpty()
         ? Optional.of(
             new PipeSetTTLPlan(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index f4763406565..aa65375290a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -507,9 +507,11 @@ public class ConfigPlanExecutor {
         // Will not be actually executed.
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       case PipeUnsetTemplate:
-        // PipeUnsetTemplate plan will not be written here, and exists only 
after pipe sender
-        // collects UnsetTemplatePlan and before receiver calls ConfigManager.
-        throw new UnsupportedOperationException("PipeUnsetTemplate is not 
supported.");
+      case PipeSetTTL:
+        // PipeUnsetTemplate/PipeSetTTL plan will not be written here, and 
exists only after pipe
+        // sender collects UnsetTemplatePlan/SetTTLPlan and before receiver 
calls ConfigManager.
+        throw new UnsupportedOperationException(
+            String.format("Plan type %s is not supported.", 
physicalPlan.getType()));
       case TestOnly:
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       default:
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
index a2ff81a9da4..5b341d11da8 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanPatternParseVisitorTest.java
@@ -391,13 +391,13 @@ public class 
PipeConfigPhysicalPlanPatternParseVisitorTest {
         ((PipeSetTTLPlan)
                 IoTDBConfigRegionExtractor.PATTERN_PARSE_VISITOR
                     .visitTTL(
-                        new SetTTLPlan(Arrays.asList("root", "*", "device", 
"s1"), Long.MAX_VALUE),
+                        new SetTTLPlan(Arrays.asList("root", "db", "**"), 
Long.MAX_VALUE),
                         prefixPathPattern)
                     .orElseThrow(AssertionError::new))
             .getSetTTLPlans();
 
     Assert.assertEquals(
-        Collections.singletonList(new PartialPath("root.db.device.s1")),
+        Collections.singletonList(new PartialPath("root.db.device.**")),
         plans.stream()
             .map(setTTLPlan -> new 
PartialPath(setTTLPlan.getDatabasePathPattern()))
             .collect(Collectors.toList()));

Reply via email to