This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e650cbb1e1 Pipe: Reduced the log of epoch switching & Optimized the 
memory calculation of insertion event & Refactor & Optimized the memory 
reservation logic of tsFile parser provider & Added the missing parsing logic + 
Fixed the wrong listening types to table meta sync & Skipped the file parsing 
in privilege for empty file and root user & Subscription IT: assertGte for 
received tsfile count (#15068)
8e650cbb1e1 is described below

commit 8e650cbb1e1ca9cc9023b87cdfc4ae7ec5cc090e
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 18 19:49:04 2025 +0800

    Pipe: Reduced the log of epoch switching & Optimized the memory calculation 
of insertion event & Refactor & Optimized the memory reservation logic of 
tsFile parser provider & Added the missing parsing logic + Fixed the wrong 
listening types to table meta sync & Skipped the file parsing in privilege for 
empty file and root user & Subscription IT: assertGte for received tsfile count 
(#15068)
    
    Co-authored-by: VGalaxies <[email protected]>
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../IoTDBPathLooseDeviceTsfilePushConsumerIT.java  | 10 +--
 .../IoTDBTimeLooseTsfilePushConsumerIT.java        | 10 +--
 .../IoTDBTSPatternTsfilePushConsumerIT.java        |  2 +-
 .../time/IoTDBRealTimeDBTsfilePushConsumerIT.java  |  8 +--
 .../time/IoTDBTimeRangeDBTsfilePushConsumerIT.java | 22 +++---
 .../write/pipe/payload/PipeDeleteDevicesPlan.java  | 44 +++---------
 .../request/write/table/AbstractTablePlan.java     | 15 +++-
 .../extractor/ConfigRegionListeningFilter.java     |  3 +-
 ...ConfigPhysicalPlanTablePatternParseVisitor.java | 62 +++++++---------
 ...nfigPhysicalPlanTablePrivilegeParseVisitor.java | 82 +++++++++++++---------
 .../receiver/protocol/IoTDBConfigNodeReceiver.java | 37 +---------
 ...igPhysicalPlanTablePatternParseVisitorTest.java |  9 +++
 .../protocol/opcda/OpcDaServerHandle.java          |  4 +-
 .../connector/protocol/opcua/OpcUaNameSpace.java   |  4 +-
 .../async/IoTDBDataRegionAsyncConnector.java       | 21 ------
 .../db/pipe/event/common/PipeInsertionEvent.java   | 13 ----
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  9 ++-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  4 +-
 .../tablet/parser/TabletInsertionEventParser.java  | 15 ++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 70 ++++++++++++------
 .../parser/TsFileInsertionEventParserProvider.java | 19 +++--
 .../PipeRealtimeDataRegionHybridExtractor.java     | 12 ++--
 .../pipe/resource/tsfile/PipeTsFileResource.java   |  2 +-
 23 files changed, 224 insertions(+), 253 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
index d6fbe7676e9..82b4d42c74e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBPathLooseDeviceTsfilePushConsumerIT.java
@@ -199,7 +199,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT 
extends AbstractSubscripti
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 1);
+          assertGte(onReceive.get(), 1);
           assertGte(rowCounts.get(0).get(), 3, "Write data before 
subscription" + device);
           assertGte(rowCounts.get(0).get(), 3, "Write data before 
subscription" + device2);
         });
@@ -211,7 +211,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT 
extends AbstractSubscripti
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 1);
+          assertGte(onReceive.get(), 1);
           assertGte(rowCounts.get(0).get(), 3, "Write out-of-range data" + 
device);
           assertGte(rowCounts.get(0).get(), 3, "Write out-of-range data" + 
device2);
         });
@@ -223,7 +223,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT 
extends AbstractSubscripti
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 2);
+          assertGte(onReceive.get(), 2);
           assertGte(rowCounts.get(0).get(), 8, "write data" + device);
           assertGte(rowCounts.get(0).get(), 8, "write data " + device2);
         });
@@ -235,7 +235,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT 
extends AbstractSubscripti
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 3);
+          assertGte(onReceive.get(), 3);
           assertGte(rowCounts.get(0).get(), 10, "Write data: end boundary at " 
+ device);
           assertGte(rowCounts.get(0).get(), 10, "Write data: end boundary at " 
+ device2);
         });
@@ -246,7 +246,7 @@ public class IoTDBPathLooseDeviceTsfilePushConsumerIT 
extends AbstractSubscripti
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 3);
+          assertGte(onReceive.get(), 3);
           assertGte(rowCounts.get(0).get(), 10, "Write data: > end " + device);
           assertGte(rowCounts.get(0).get(), 10, "Write data: > end " + 
device2);
         });
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
index 5571f1190b9..4691dd2be7f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsfilePushConsumerIT.java
@@ -186,7 +186,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 1);
+          assertGte(onReceive.get(), 1);
           assertGte(rowCount.get(), 3);
         });
 
@@ -197,7 +197,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 1);
+          assertGte(onReceive.get(), 1);
           assertGte(rowCount.get(), 3);
         });
 
@@ -208,7 +208,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 2);
+          assertGte(onReceive.get(), 2);
           assertGte(rowCount.get(), 8);
         });
 
@@ -219,7 +219,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 3);
+          assertGte(onReceive.get(), 3);
           assertGte(rowCount.get(), 10);
         });
 
@@ -230,7 +230,7 @@ public class IoTDBTimeLooseTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 3);
+          assertGte(onReceive.get(), 3);
           assertGte(rowCount.get(), 10);
         });
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
index bd4f94f8563..4b9bda04ead 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
@@ -218,7 +218,7 @@ public class IoTDBTSPatternTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
     System.out.println(FORMAT.format(new Date()) + " src:" + 
getCount(session_src, sql));
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceiveCount.get(), 2, "receive files over 2");
+          assertGte(onReceiveCount.get(), 2, "receive files over 2");
           assertEquals(rowCounts.get(0).get(), 25, device + ".s_0");
           assertEquals(rowCounts.get(1).get(), 0, device + ".s_1");
           assertEquals(rowCounts.get(2).get(), 0, database + ".d_1.s_0");
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
index b1fe9d07937..3e465bb9710 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBTsfilePushConsumerIT.java
@@ -164,8 +164,8 @@ public class IoTDBRealTimeDBTsfilePushConsumerIT extends 
AbstractSubscriptionReg
     insert_data(System.currentTimeMillis());
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 1, "should process 1 file");
-          assertEquals(rowCount.get(), 4, "4 records");
+          assertGte(onReceive.get(), 1, "should process 1 file");
+          assertGte(rowCount.get(), 4, "4 records");
         });
 
     // Subscribe and then write data
@@ -173,8 +173,8 @@ public class IoTDBRealTimeDBTsfilePushConsumerIT extends 
AbstractSubscriptionReg
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 2, "should process 2 file");
-          assertEquals(rowCount.get(), 8, "8 records");
+          assertGte(onReceive.get(), 2, "should process 2 file");
+          assertGte(rowCount.get(), 8, "8 records");
         });
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
index 2107d4e9bc1..fa97e511f30 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBTsfilePushConsumerIT.java
@@ -162,38 +162,36 @@ public class IoTDBTimeRangeDBTsfilePushConsumerIT extends 
AbstractSubscriptionRe
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 1);
-          // loose-time should 2 records,get 4 records
-          assertTrue(rowCount.get() >= 2);
+          assertGte(onReceive.get(), 1);
+          assertGte(rowCount.get(), 2);
         });
 
     insert_data(System.currentTimeMillis()); // now, not in range
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 1);
-          assertTrue(rowCount.get() >= 2);
+          assertGte(onReceive.get(), 1);
+          assertGte(rowCount.get(), 2);
         });
 
     insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 2);
-          assertTrue(rowCount.get() >= 6);
+          assertGte(onReceive.get(), 2);
+          assertGte(rowCount.get(), 6);
         });
 
     insert_data(1711814398000L); // 2024-03-30 23:59:58+08:00
     AWAIT.untilAsserted(
         () -> {
-          // Because the end time is 2024-03-31 00:00:00, closed interval
-          assertEquals(onReceive.get(), 3);
-          assertTrue(rowCount.get() >= 8);
+          assertGte(onReceive.get(), 3);
+          assertGte(rowCount.get(), 8);
         });
 
     insert_data(1711900798000L); // 2024-03-31 23:59:58+08:00
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceive.get(), 3);
-          assertTrue(rowCount.get() >= 8);
+          assertGte(onReceive.get(), 3);
+          assertGte(rowCount.get(), 8);
         });
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
index 90bfd9c433f..35dc0e929d9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeDeleteDevicesPlan.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.confignode.consensus.request.write.pipe.payload;
 
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import 
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -32,9 +32,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
-public class PipeDeleteDevicesPlan extends ConfigPhysicalPlan {
-  private String database;
-  private String tableName;
+public class PipeDeleteDevicesPlan extends AbstractTablePlan {
   private byte[] patternBytes;
   private byte[] filterBytes;
   private byte[] modBytes;
@@ -49,22 +47,12 @@ public class PipeDeleteDevicesPlan extends 
ConfigPhysicalPlan {
       final @Nonnull byte[] patternBytes,
       final @Nonnull byte[] filterBytes,
       final @Nonnull byte[] modBytes) {
-    super(ConfigPhysicalPlanType.PipeDeleteDevices);
-    this.database = database;
-    this.tableName = tableName;
+    super(ConfigPhysicalPlanType.PipeDeleteDevices, database, tableName);
     this.patternBytes = patternBytes;
     this.filterBytes = filterBytes;
     this.modBytes = modBytes;
   }
 
-  public String getDatabase() {
-    return database;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
   public byte[] getPatternBytes() {
     return patternBytes;
   }
@@ -79,9 +67,7 @@ public class PipeDeleteDevicesPlan extends ConfigPhysicalPlan 
{
 
   @Override
   protected void serializeImpl(final DataOutputStream stream) throws 
IOException {
-    stream.writeShort(getType().getPlanType());
-    ReadWriteIOUtils.write(database, stream);
-    ReadWriteIOUtils.write(tableName, stream);
+    super.serializeImpl(stream);
     ReadWriteIOUtils.write(patternBytes.length, stream);
     stream.write(patternBytes);
     ReadWriteIOUtils.write(filterBytes.length, stream);
@@ -92,8 +78,7 @@ public class PipeDeleteDevicesPlan extends ConfigPhysicalPlan 
{
 
   @Override
   protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
-    this.database = ReadWriteIOUtils.readString(buffer);
-    this.tableName = ReadWriteIOUtils.readString(buffer);
+    super.deserializeImpl(buffer);
     patternBytes = new byte[ReadWriteIOUtils.readInt(buffer)];
     buffer.get(patternBytes);
     filterBytes = new byte[ReadWriteIOUtils.readInt(buffer)];
@@ -104,25 +89,16 @@ public class PipeDeleteDevicesPlan extends 
ConfigPhysicalPlan {
 
   @Override
   public boolean equals(final Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    final PipeDeleteDevicesPlan that = (PipeDeleteDevicesPlan) obj;
-    return Objects.equals(this.database, that.database)
-        && Objects.equals(this.tableName, that.tableName)
-        && Arrays.equals(this.patternBytes, that.patternBytes)
-        && Arrays.equals(this.filterBytes, that.filterBytes)
-        && Arrays.equals(this.modBytes, that.modBytes);
+    return super.equals(obj)
+        && Arrays.equals(this.patternBytes, ((PipeDeleteDevicesPlan) 
obj).patternBytes)
+        && Arrays.equals(this.filterBytes, ((PipeDeleteDevicesPlan) 
obj).filterBytes)
+        && Arrays.equals(this.modBytes, ((PipeDeleteDevicesPlan) 
obj).modBytes);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(
-        database,
-        tableName,
+        super.hashCode(),
         Arrays.hashCode(patternBytes),
         Arrays.hashCode(filterBytes),
         Arrays.hashCode(modBytes));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
index 6ee66ca94d4..98d50e4ca33 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AbstractTablePlan.java
@@ -27,8 +27,9 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
-abstract class AbstractTablePlan extends ConfigPhysicalPlan {
+public abstract class AbstractTablePlan extends ConfigPhysicalPlan {
 
   private String database;
 
@@ -53,6 +54,18 @@ abstract class AbstractTablePlan extends ConfigPhysicalPlan {
     return tableName;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final AbstractTablePlan that = (AbstractTablePlan) o;
+    return Objects.equals(database, that.database) && 
Objects.equals(tableName, that.tableName);
+  }
+
   @Override
   protected void serializeImpl(final DataOutputStream stream) throws 
IOException {
     stream.writeShort(getType().getPlanType());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index 9af665d74e0..82d23b76350 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -171,6 +171,7 @@ public class ConfigRegionListeningFilter {
           Collections.unmodifiableList(
               Arrays.asList(
                   ConfigPhysicalPlanType.CreateUser,
+                  ConfigPhysicalPlanType.RCreateUser,
                   ConfigPhysicalPlanType.CreateUserWithRawPassword)));
       OPTION_PLAN_MAP.put(
           new PartialPath("auth.user.alter"),
@@ -180,7 +181,7 @@ public class ConfigRegionListeningFilter {
       OPTION_PLAN_MAP.put(
           new PartialPath("auth.user.drop"),
           Collections.unmodifiableList(
-              Arrays.asList(ConfigPhysicalPlanType.DropUser, 
ConfigPhysicalPlanType.RUpdateUser)));
+              Arrays.asList(ConfigPhysicalPlanType.DropUser, 
ConfigPhysicalPlanType.RDropUser)));
 
       // Both
       OPTION_PLAN_MAP.put(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
index 42eeda61284..1d7fd17b628 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitor.java
@@ -27,6 +27,8 @@ import 
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelational
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
@@ -77,10 +79,9 @@ public class PipeConfigPhysicalPlanTablePatternParseVisitor
   @Override
   public Optional<ConfigPhysicalPlan> visitPipeCreateTable(
       final PipeCreateTablePlan pipeCreateTablePlan, final TablePattern 
pattern) {
-    return matchDatabaseAndTableName(
-            pipeCreateTablePlan.getDatabase(),
-            pipeCreateTablePlan.getTable().getTableName(),
-            pattern)
+    return pattern.matchesDatabase(
+                
PathUtils.unQualifyDatabaseName(pipeCreateTablePlan.getDatabase()))
+            && 
pattern.matchesTable(pipeCreateTablePlan.getTable().getTableName())
         ? Optional.of(pipeCreateTablePlan)
         : Optional.empty();
   }
@@ -88,72 +89,57 @@ public class PipeConfigPhysicalPlanTablePatternParseVisitor
   @Override
   public Optional<ConfigPhysicalPlan> visitAddTableColumn(
       final AddTableColumnPlan addTableColumnPlan, final TablePattern pattern) 
{
-    return matchDatabaseAndTableName(
-            addTableColumnPlan.getDatabase(), 
addTableColumnPlan.getTableName(), pattern)
-        ? Optional.of(addTableColumnPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(addTableColumnPlan, pattern);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitSetTableProperties(
       final SetTablePropertiesPlan setTablePropertiesPlan, final TablePattern 
pattern) {
-    return matchDatabaseAndTableName(
-            setTablePropertiesPlan.getDatabase(), 
setTablePropertiesPlan.getTableName(), pattern)
-        ? Optional.of(setTablePropertiesPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(setTablePropertiesPlan, pattern);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitCommitDeleteColumn(
       final CommitDeleteColumnPlan commitDeleteColumnPlan, final TablePattern 
pattern) {
-    return matchDatabaseAndTableName(
-            commitDeleteColumnPlan.getDatabase(), 
commitDeleteColumnPlan.getTableName(), pattern)
-        ? Optional.of(commitDeleteColumnPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(commitDeleteColumnPlan, pattern);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitRenameTableColumn(
       final RenameTableColumnPlan renameTableColumnPlan, final TablePattern 
pattern) {
-    return matchDatabaseAndTableName(
-            renameTableColumnPlan.getDatabase(), 
renameTableColumnPlan.getTableName(), pattern)
-        ? Optional.of(renameTableColumnPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(renameTableColumnPlan, pattern);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitCommitDeleteTable(
       final CommitDeleteTablePlan commitDeleteTablePlan, final TablePattern 
pattern) {
-    return matchDatabaseAndTableName(
-            commitDeleteTablePlan.getDatabase(), 
commitDeleteTablePlan.getTableName(), pattern)
-        ? Optional.of(commitDeleteTablePlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(commitDeleteTablePlan, pattern);
+  }
+
+  @Override
+  public Optional<ConfigPhysicalPlan> visitPipeDeleteDevices(
+      final PipeDeleteDevicesPlan pipeDeleteDevicesPlan, final TablePattern 
pattern) {
+    return visitAbstractTablePlan(pipeDeleteDevicesPlan, pattern);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitSetTableComment(
       final SetTableCommentPlan setTableCommentPlan, final TablePattern 
pattern) {
-    return matchDatabaseAndTableName(
-            setTableCommentPlan.getDatabase(), 
setTableCommentPlan.getTableName(), pattern)
-        ? Optional.of(setTableCommentPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(setTableCommentPlan, pattern);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitSetTableColumnComment(
       final SetTableColumnCommentPlan setTableColumnCommentPlan, final 
TablePattern pattern) {
-    return matchDatabaseAndTableName(
-            setTableColumnCommentPlan.getDatabase(),
-            setTableColumnCommentPlan.getTableName(),
-            pattern)
-        ? Optional.of(setTableColumnCommentPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(setTableColumnCommentPlan, pattern);
   }
 
-  private boolean matchDatabaseAndTableName(
-      final String database, final String tableName, final TablePattern 
pattern) {
-    return pattern.matchesDatabase(PathUtils.unQualifyDatabaseName(database))
-        && pattern.matchesTable(tableName);
+  private Optional<ConfigPhysicalPlan> visitAbstractTablePlan(
+      final AbstractTablePlan plan, final TablePattern pattern) {
+    return 
pattern.matchesDatabase(PathUtils.unQualifyDatabaseName(plan.getDatabase()))
+            && pattern.matchesTable(plan.getTableName())
+        ? Optional.of(plan)
+        : Optional.empty();
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
index e5f4a170f06..28c6fb8c4cf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePrivilegeParseVisitor.java
@@ -27,10 +27,14 @@ import 
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelational
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.RenameTableColumnPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.table.SetTableColumnCommentPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.table.SetTableCommentPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.SetTablePropertiesPlan;
 import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -90,10 +94,18 @@ public class 
PipeConfigPhysicalPlanTablePrivilegeParseVisitor
   @Override
   public Optional<ConfigPhysicalPlan> visitPipeCreateTable(
       final PipeCreateTablePlan pipeCreateTablePlan, final String userName) {
-    return matchDatabaseAndTableName(
-            pipeCreateTablePlan.getDatabase(),
-            pipeCreateTablePlan.getTable().getTableName(),
-            userName)
+    return ConfigNode.getInstance()
+                .getConfigManager()
+                .getPermissionManager()
+                .checkUserPrivileges(
+                    userName,
+                    new PrivilegeUnion(
+                        pipeCreateTablePlan.getDatabase(),
+                        pipeCreateTablePlan.getTable().getTableName(),
+                        null))
+                .getStatus()
+                .getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()
         ? Optional.of(pipeCreateTablePlan)
         : Optional.empty();
   }
@@ -101,57 +113,63 @@ public class 
PipeConfigPhysicalPlanTablePrivilegeParseVisitor
   @Override
   public Optional<ConfigPhysicalPlan> visitAddTableColumn(
       final AddTableColumnPlan addTableColumnPlan, final String userName) {
-    return matchDatabaseAndTableName(
-            addTableColumnPlan.getDatabase(), 
addTableColumnPlan.getTableName(), userName)
-        ? Optional.of(addTableColumnPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(addTableColumnPlan, userName);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitSetTableProperties(
       final SetTablePropertiesPlan setTablePropertiesPlan, final String 
userName) {
-    return matchDatabaseAndTableName(
-            setTablePropertiesPlan.getDatabase(), 
setTablePropertiesPlan.getTableName(), userName)
-        ? Optional.of(setTablePropertiesPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(setTablePropertiesPlan, userName);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitCommitDeleteColumn(
       final CommitDeleteColumnPlan commitDeleteColumnPlan, final String 
userName) {
-    return matchDatabaseAndTableName(
-            commitDeleteColumnPlan.getDatabase(), 
commitDeleteColumnPlan.getTableName(), userName)
-        ? Optional.of(commitDeleteColumnPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(commitDeleteColumnPlan, userName);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitRenameTableColumn(
       final RenameTableColumnPlan renameTableColumnPlan, final String 
userName) {
-    return matchDatabaseAndTableName(
-            renameTableColumnPlan.getDatabase(), 
renameTableColumnPlan.getTableName(), userName)
-        ? Optional.of(renameTableColumnPlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(renameTableColumnPlan, userName);
   }
 
   @Override
   public Optional<ConfigPhysicalPlan> visitCommitDeleteTable(
       final CommitDeleteTablePlan commitDeleteTablePlan, final String 
userName) {
-    return matchDatabaseAndTableName(
-            commitDeleteTablePlan.getDatabase(), 
commitDeleteTablePlan.getTableName(), userName)
-        ? Optional.of(commitDeleteTablePlan)
-        : Optional.empty();
+    return visitAbstractTablePlan(commitDeleteTablePlan, userName);
+  }
+
+  @Override
+  public Optional<ConfigPhysicalPlan> visitPipeDeleteDevices(
+      final PipeDeleteDevicesPlan pipeDeleteDevicesPlan, final String 
userName) {
+    return visitAbstractTablePlan(pipeDeleteDevicesPlan, userName);
   }
 
-  private boolean matchDatabaseAndTableName(
-      final String database, final String tableName, final String userName) {
+  @Override
+  public Optional<ConfigPhysicalPlan> visitSetTableComment(
+      final SetTableCommentPlan setTableCommentPlan, final String userName) {
+    return visitAbstractTablePlan(setTableCommentPlan, userName);
+  }
+
+  @Override
+  public Optional<ConfigPhysicalPlan> visitSetTableColumnComment(
+      final SetTableColumnCommentPlan setTableColumnCommentPlan, final String 
userName) {
+    return visitAbstractTablePlan(setTableColumnCommentPlan, userName);
+  }
+
+  private Optional<ConfigPhysicalPlan> visitAbstractTablePlan(
+      final AbstractTablePlan plan, final String userName) {
     return ConfigNode.getInstance()
-            .getConfigManager()
-            .getPermissionManager()
-            .checkUserPrivileges(userName, new PrivilegeUnion(database, 
tableName, null))
-            .getStatus()
-            .getCode()
-        == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+                .getConfigManager()
+                .getPermissionManager()
+                .checkUserPrivileges(
+                    userName, new PrivilegeUnion(plan.getDatabase(), 
plan.getTableName(), null))
+                .getStatus()
+                .getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? Optional.of(plan)
+        : Optional.empty();
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 4cbb001fb09..b1a3dfac009 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -56,6 +56,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDele
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
@@ -334,48 +335,16 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     PrivilegeType.CREATE))
             .getStatus();
       case AddTableColumn:
-        return configManager
-            .checkUserPrivileges(
-                username,
-                new PrivilegeUnion(
-                    ((AddTableColumnPlan) plan).getDatabase(),
-                    ((AddTableColumnPlan) plan).getTableName(),
-                    PrivilegeType.ALTER))
-            .getStatus();
       case SetTableProperties:
-        return configManager
-            .checkUserPrivileges(
-                username,
-                new PrivilegeUnion(
-                    ((SetTablePropertiesPlan) plan).getDatabase(),
-                    ((SetTablePropertiesPlan) plan).getTableName(),
-                    PrivilegeType.ALTER))
-            .getStatus();
       case CommitDeleteColumn:
-        return configManager
-            .checkUserPrivileges(
-                username,
-                new PrivilegeUnion(
-                    ((CommitDeleteColumnPlan) plan).getDatabase(),
-                    ((CommitDeleteColumnPlan) plan).getTableName(),
-                    PrivilegeType.ALTER))
-            .getStatus();
       case SetTableComment:
-        return configManager
-            .checkUserPrivileges(
-                username,
-                new PrivilegeUnion(
-                    ((SetTableCommentPlan) plan).getDatabase(),
-                    ((SetTableCommentPlan) plan).getTableName(),
-                    PrivilegeType.ALTER))
-            .getStatus();
       case SetTableColumnComment:
         return configManager
             .checkUserPrivileges(
                 username,
                 new PrivilegeUnion(
-                    ((SetTableColumnCommentPlan) plan).getDatabase(),
-                    ((SetTableColumnCommentPlan) plan).getTableName(),
+                    ((AbstractTablePlan) plan).getDatabase(),
+                    ((AbstractTablePlan) plan).getTableName(),
                     PrivilegeType.ALTER))
             .getStatus();
       case CommitDeleteTable:
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
index df6c5050eb9..65db59338c1 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/extractor/PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelational
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
@@ -116,6 +117,14 @@ public class 
PipeConfigPhysicalPlanTablePatternParseVisitorTest {
         new CommitDeleteTablePlan("da", "ac"));
   }
 
+  @Test
+  public void testPipeDeleteDevices() {
+    testInput(
+        new PipeDeleteDevicesPlan("db1", "ab", new byte[0], new byte[0], new 
byte[0]),
+        new PipeDeleteDevicesPlan("db1", "ac", new byte[0], new byte[0], new 
byte[0]),
+        new PipeDeleteDevicesPlan("da", "ac", new byte[0], new byte[0], new 
byte[0]));
+  }
+
   @Test
   public void testSetTableComment() {
     testInput(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
index 1c9e8c77315..167fd4988f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
@@ -194,9 +194,7 @@ public class OpcDaServerHandle implements Closeable {
         addItem(itemId, schemas.get(i).getType());
       }
       for (int j = tablet.getRowSize() - 1; j >= 0; --j) {
-        if (Objects.isNull(tablet.getBitMaps())
-            || Objects.isNull(tablet.getBitMaps()[i])
-            || !tablet.getBitMaps()[i].isMarked(j)) {
+        if (!tablet.isNull(j, i)) {
           if (serverTimestampMap.get(itemId) <= tablet.getTimestamp(j)) {
             writeData(
                 itemId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
index 8beccbde59b..acb759df111 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
@@ -124,9 +124,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
       for (int i = 0; i < schemas.size(); ++i) {
         for (int j = tablet.getRowSize() - 1; j >= 0; --j) {
-          if (Objects.isNull(tablet.getBitMaps())
-              || Objects.isNull(tablet.getBitMaps()[i])
-              || !tablet.getBitMaps()[i].isMarked(j)) {
+          if (!tablet.isNull(j, i)) {
             newSchemas.add(schemas.get(i));
             timestamps.add(tablet.getTimestamp(j));
             values.add(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index d3c3c73b728..4873e1d1fb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -614,27 +614,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     return retryEventQueue.size();
   }
 
-  // For performance, this will not acquire lock and does not guarantee the 
correct
-  // result. However, this shall not cause any exceptions when concurrently 
read & written.
-  public int getRetryEventCount(final String pipeName) {
-    final AtomicInteger count = new AtomicInteger(0);
-    try {
-      retryEventQueue.forEach(
-          event -> {
-            if (event instanceof EnrichedEvent
-                && pipeName.equals(((EnrichedEvent) event).getPipeName())) {
-              count.incrementAndGet();
-            }
-          });
-      return count.get();
-    } catch (final Exception e) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Failed to get retry event count for pipe {}.", pipeName, 
e);
-      }
-      return count.get();
-    }
-  }
-
   //////////////////////// APIs provided for PipeTransferTrackableHandler 
////////////////////////
 
   public boolean isClosed() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index 7b0437fba06..5be67305d64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -28,9 +28,6 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
 import javax.validation.constraints.NotNull;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * The data model used to record the Event and the data model of the 
DataRegion corresponding to the
  * source data, so this type requires some specifications .
@@ -60,8 +57,6 @@ public abstract class PipeInsertionEvent extends 
EnrichedEvent {
   protected String treeModelDatabaseName; // lazy initialization
   protected String tableModelDatabaseName; // lazy initialization
 
-  private Set<String> noPrivilegeTableNames = new HashSet<>();
-
   protected PipeInsertionEvent(
       final String pipeName,
       final long creationTime,
@@ -174,12 +169,4 @@ public abstract class PipeInsertionEvent extends 
EnrichedEvent {
     this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
     this.treeModelDatabaseName = 
PathUtils.qualifyDatabaseName(tableModelDatabaseName);
   }
-
-  public void addTable(final String tableName) {
-    noPrivilegeTableNames.add(tableName);
-  }
-
-  public Set<String> getNoPrivilegeTableNames() {
-    return noPrivilegeTableNames;
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 84976cdab57..598eeaed0da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -85,7 +85,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
           + RamUsageEstimator.shallowSizeOfInstance(WALEntryHandler.class)
           + RamUsageEstimator.shallowSizeOfInstance(WALEntryPosition.class)
           + RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class)
-          + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class);
+          + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class)
+          + RamUsageEstimator.shallowSizeOf(Boolean.class);
   private static final long SET_SIZE = 
RamUsageEstimator.shallowSizeOfInstance(HashSet.class);
 
   private final WALEntryHandler walEntryHandler;
@@ -579,6 +580,12 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     return INSTANCE_SIZE
         + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) 
: 0)
         + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)
+        + (Objects.nonNull(treeModelDatabaseName)
+            ? RamUsageEstimator.sizeOf(treeModelDatabaseName)
+            : 0)
+        + (Objects.nonNull(tableModelDatabaseName)
+            ? RamUsageEstimator.sizeOf(tableModelDatabaseName)
+            : 0)
         + (Objects.nonNull(tableNames)
             ? SET_SIZE
                 + 
tableNames.stream().mapToLong(RamUsageEstimator::sizeOf).reduce(0L, Long::sum)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 69076810175..10b664cf4b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -363,8 +363,8 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
   }
 
   public long count() {
-    final Tablet covertedTablet = shouldParseTimeOrPattern() ? 
convertToTablet() : tablet;
-    return (long) covertedTablet.getRowSize() * 
covertedTablet.getSchemas().size();
+    final Tablet convertedTablet = shouldParseTimeOrPattern() ? 
convertToTablet() : tablet;
+    return (long) convertedTablet.getRowSize() * 
convertedTablet.getSchemas().size();
   }
 
   /////////////////////////// parsePatternOrTime ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index 13c7c0e5c33..713e5da872c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -142,8 +142,8 @@ public abstract class TabletInsertionEventParser {
     final MeasurementSchema[] originMeasurementSchemaList = 
insertRowNode.getMeasurementSchemas();
     final String[] originColumnNameStringList = 
insertRowNode.getMeasurements();
     final TsTableColumnCategory[] originColumnCategories = 
insertRowNode.getColumnCategories();
-    final TSDataType[] originValueColumnDataTypes = 
insertRowNode.getDataTypes();
-    final Object[] originValueColumns = insertRowNode.getValues();
+    final TSDataType[] originValueDataTypes = insertRowNode.getDataTypes();
+    final Object[] originValues = insertRowNode.getValues();
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -154,12 +154,11 @@ public abstract class TabletInsertionEventParser {
             originColumnCategories != null && originColumnCategories[i] != null
                 ? originColumnCategories[i].toTsFileColumnType()
                 : Tablet.ColumnCategory.FIELD;
-        this.valueColumnDataTypes[filteredColumnIndex] = 
originValueColumnDataTypes[i];
+        this.valueColumnDataTypes[filteredColumnIndex] = 
originValueDataTypes[i];
         final BitMap bitMap = new BitMap(this.timestampColumn.length);
-        if (Objects.isNull(originValueColumns[i])
-            || Objects.isNull(originValueColumnDataTypes[i])) {
+        if (Objects.isNull(originValues[i]) || 
Objects.isNull(originValueDataTypes[i])) {
           fillNullValue(
-              originValueColumnDataTypes[i],
+              originValueDataTypes[i],
               this.valueColumns,
               bitMap,
               filteredColumnIndex,
@@ -167,8 +166,8 @@ public abstract class TabletInsertionEventParser {
         } else {
           this.valueColumns[filteredColumnIndex] =
               filterValueColumnsByRowIndexList(
-                  originValueColumnDataTypes[i],
-                  originValueColumns[i],
+                  originValueDataTypes[i],
+                  originValues[i],
                   rowIndexList,
                   true,
                   bitMap, // use the output bitmap since there is no bitmap in 
InsertRowNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 060f62f15cc..19a4913429f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -52,7 +54,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.AccessDeniedException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
@@ -429,29 +430,52 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   }
 
   @Override
-  public void throwIfNoPrivilege() throws IOException {
-    if (!isTableModelEvent()) {
-      return;
-    }
-    for (final IDeviceID deviceID : getDeviceSet()) {
-      if (!tablePattern.matchesDatabase(getTableModelDatabaseName())
-          || !tablePattern.matchesTable(deviceID.getTableName())) {
-        continue;
+  public void throwIfNoPrivilege() {
+    try {
+      if (!isTableModelEvent() || 
AuthorityChecker.SUPER_USER.equals(userName)) {
+        return;
+      }
+      if (!waitForTsFileClose()) {
+        LOGGER.info("Temporary tsFile {} detected, will skip its transfer.", 
tsFile);
+        return;
       }
-      if (!Coordinator.getInstance()
-          .getAccessControl()
-          .checkCanSelectFromTable4Pipe(
-              userName,
-              new QualifiedObjectName(getTableModelDatabaseName(), 
deviceID.getTableName()))) {
-        if (skipIfNoPrivileges) {
-          shouldParse4Privilege = true;
-        } else {
-          throw new AccessDeniedException(
-              String.format(
-                  "No privilege for SELECT for user %s at table %s.%s",
-                  userName, tableModelDatabaseName, deviceID.getTableName()));
+      for (final IDeviceID deviceID : getDeviceSet()) {
+        if (!tablePattern.matchesDatabase(getTableModelDatabaseName())
+            || !tablePattern.matchesTable(deviceID.getTableName())) {
+          continue;
         }
+        if (!Coordinator.getInstance()
+            .getAccessControl()
+            .checkCanSelectFromTable4Pipe(
+                userName,
+                new QualifiedObjectName(getTableModelDatabaseName(), 
deviceID.getTableName()))) {
+          if (skipIfNoPrivileges) {
+            shouldParse4Privilege = true;
+          } else {
+            throw new AccessDeniedException(
+                String.format(
+                    "No privilege for SELECT for user %s at table %s.%s",
+                    userName, tableModelDatabaseName, 
deviceID.getTableName()));
+          }
+        }
+      }
+    } catch (final AccessDeniedException e) {
+      throw e;
+    } catch (final Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
       }
+
+      final String errorMsg =
+          e instanceof InterruptedException
+              ? String.format(
+                  "Interrupted when waiting for parsing privilege for TsFile 
%s.",
+                  resource.getTsFilePath())
+              : String.format(
+                  "Parse TsFile %s when checking privilege error. Because: %s",
+                  resource.getTsFilePath(), e.getMessage());
+      LOGGER.warn(errorMsg, e);
+      throw new PipeException(errorMsg, e);
     }
   }
 
@@ -578,7 +602,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
               : String.format(
                   "Parse TsFile %s error. Because: %s", 
resource.getTsFilePath(), e.getMessage());
       LOGGER.warn(errorMsg, e);
-      throw new PipeException(errorMsg);
+      throw new PipeException(errorMsg, e);
     }
   }
 
@@ -656,7 +680,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
 
       final String errorMsg = String.format("Read TsFile %s error.", 
resource.getTsFilePath());
       LOGGER.warn(errorMsg, e);
-      throw new PipeException(errorMsg);
+      throw new PipeException(errorMsg, e);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
index d90f0ad067e..200130f6e14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertion
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
 
@@ -75,13 +76,17 @@ public class TsFileInsertionEventParserProvider {
           tsFile, tablePattern, startTime, endTime, pipeTaskMeta, userName, 
sourceEvent);
     }
 
-    if (startTime != Long.MIN_VALUE
-        || endTime != Long.MAX_VALUE
-        || treePattern instanceof IoTDBTreePattern
-            && !((IoTDBTreePattern) 
treePattern).mayMatchMultipleTimeSeriesInOneDevice()) {
-      // 1. If time filter exists, use query here because the scan container 
may filter it
-      // row by row in single page chunk.
-      // 2. If the pattern matches only one time series in one device, use 
query container here
+    // Use scan container to save memory
+    if ((double) 
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()
+            / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+        > PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
+      return new TsFileInsertionEventScanParser(
+          tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+    }
+
+    if (treePattern instanceof IoTDBTreePattern
+        && !((IoTDBTreePattern) 
treePattern).mayMatchMultipleTimeSeriesInOneDevice()) {
+      // If the pattern matches only one time series in one device, use query 
container here
       // because there is no timestamps merge overhead.
       //
       // Note: We judge prefix pattern as matching multiple timeseries in one 
device because it's
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index cf5e88a72a6..38dcd652e87 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -82,13 +82,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private void extractTabletInsertion(final PipeRealtimeEvent event) {
-    if (canNotUseTabletAnyMore(event)) {
+    TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+
+    if (state != TsFileEpoch.State.USING_TSFILE
+        && state != TsFileEpoch.State.USING_BOTH
+        && canNotUseTabletAnyMore(event)) {
       event
           .getTsFileEpoch()
           .migrateState(
               this,
-              state -> {
-                switch (state) {
+              curState -> {
+                switch (curState) {
                   case EMPTY:
                   case USING_TSFILE:
                     return TsFileEpoch.State.USING_TSFILE;
@@ -100,7 +104,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
               });
     }
 
-    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    state = event.getTsFileEpoch().getState(this);
     switch (state) {
       case USING_TSFILE:
         // Ignore the tablet event.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 2cb3a59c399..dd3ca9c5bc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -49,7 +49,7 @@ public class PipeTsFileResource implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResource.class);
 
   public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
-  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f;
+  public static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f;
 
   private final File hardlinkOrCopiedFile;
   private final boolean isTsFile;

Reply via email to