This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b6c217abd3d Pipe: Fix delete inclusion aliases and metrics maps 
(#18002)
b6c217abd3d is described below

commit b6c217abd3d991638d6fca75709749e314cfc08e
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 23 10:21:54 2026 +0800

    Pipe: Fix delete inclusion aliases and metrics maps (#18002)
---
 .../metric/processor/PipeProcessorMetrics.java     |  3 +-
 .../metric/sink/PipeDataRegionSinkMetrics.java     |  3 +-
 .../options/PipeInclusionOptions.java              |  4 +-
 .../options/PipeInclusionOptionsTest.java          | 48 ++++++++++++++++++++++
 4 files changed, 52 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/processor/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/processor/PipeProcessorMetrics.java
index 4fb94c9bc13..97bb7f46341 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/processor/PipeProcessorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/processor/PipeProcessorMetrics.java
@@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +44,7 @@ public class PipeProcessorMetrics implements IMetricSet {
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
-  private final Map<String, PipeProcessorSubtask> processorMap = new 
HashMap<>();
+  private final Map<String, PipeProcessorSubtask> processorMap = new 
ConcurrentHashMap<>();
 
   private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index 41f08f055c7..e2a391e8f70 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -35,7 +35,6 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,7 +46,7 @@ public class PipeDataRegionSinkMetrics implements IMetricSet {
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
-  private final Map<String, PipeSinkSubtask> sinkMap = new HashMap<>();
+  private final Map<String, PipeSinkSubtask> sinkMap = new 
ConcurrentHashMap<>();
 
   private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/options/PipeInclusionOptions.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/options/PipeInclusionOptions.java
index 4f3e59f0f1e..9a4f2d09ce7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/options/PipeInclusionOptions.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/options/PipeInclusionOptions.java
@@ -125,7 +125,7 @@ public class PipeInclusionOptions {
                 Arrays.asList(
                     "data.delete",
                     "schema.database.drop",
-                    "schema.timeseries.ordinary.delete",
+                    "schema.timeseries.ordinary.drop",
                     "schema.timeseries.view.drop",
                     "schema.timeseries.template.drop",
                     "schema.timeseries.template.unset",
@@ -140,7 +140,7 @@ public class PipeInclusionOptions {
             new HashSet<>(
                 Arrays.asList(
                     "schema.database.drop",
-                    "schema.timeseries.ordinary.delete",
+                    "schema.timeseries.ordinary.drop",
                     "schema.timeseries.view.drop",
                     "schema.timeseries.template.drop",
                     "schema.timeseries.template.unset",
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/options/PipeInclusionOptionsTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/options/PipeInclusionOptionsTest.java
new file mode 100644
index 00000000000..63ab1c73993
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/options/PipeInclusionOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.options;
+
+import org.apache.iotdb.commons.path.PartialPath;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class PipeInclusionOptionsTest {
+
+  @Test
+  public void testDeleteAliasIncludesOrdinaryTimeseriesDrop() throws Exception 
{
+    final Set<PartialPath> options = 
PipeInclusionOptions.parseOptions("delete");
+
+    Assert.assertTrue(options.contains(new 
PartialPath("schema.timeseries.ordinary.drop")));
+    Assert.assertFalse(options.contains(new 
PartialPath("schema.timeseries.ordinary.delete")));
+    Assert.assertTrue(PipeInclusionOptions.optionsAreAllLegal("delete", true, 
false));
+  }
+
+  @Test
+  public void testSchemaDeleteAliasIncludesOrdinaryTimeseriesDrop() throws 
Exception {
+    final Set<PartialPath> options = 
PipeInclusionOptions.parseOptions("schema.delete");
+
+    Assert.assertTrue(options.contains(new 
PartialPath("schema.timeseries.ordinary.drop")));
+    Assert.assertFalse(options.contains(new 
PartialPath("schema.timeseries.ordinary.delete")));
+    Assert.assertTrue(PipeInclusionOptions.optionsAreAllLegal("schema.delete", 
true, false));
+  }
+}

Reply via email to