This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch priv-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/priv-fix by this push:
     new 23941cf8838 fix
23941cf8838 is described below

commit 23941cf8838f7fc595b00bca4473db2d34489f69
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 14 11:26:46 2026 +0800

    fix
---
 .../subtask/processor/PipeProcessorSubtask.java     | 21 ++++++++++++++++++++-
 .../db/pipe/event/common/PipeInsertionEvent.java    |  4 ++++
 .../common/tsfile/PipeTsFileInsertionEvent.java     |  4 ----
 .../dataregion/tsfile/TsFileResource.java           |  4 +++-
 4 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index d8fbac9e44e..b481117c5e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
@@ -143,7 +144,25 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       // event can be supplied after the subtask is closed, so we need to 
check isClosed here
       if (!isClosed.get()) {
         if (event instanceof TabletInsertionEvent) {
-          pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
+          if (event instanceof PipeInsertNodeTabletInsertionEvent
+              && ((PipeInsertNodeTabletInsertionEvent) 
event).shouldParse4Privilege()) {
+            final AtomicReference<Exception> ex = new AtomicReference<>();
+            ((PipeInsertNodeTabletInsertionEvent) event)
+                .toRawTabletInsertionEvents()
+                .forEach(
+                    rawTabletInsertionEvent -> {
+                      try {
+                        pipeProcessor.process(rawTabletInsertionEvent, 
outputEventCollector);
+                      } catch (Exception e) {
+                        ex.set(e);
+                      }
+                    });
+            if (ex.get() != null) {
+              throw ex.get();
+            }
+          } else {
+            pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
+          }
           PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
         } else if (event instanceof TsFileInsertionEvent) {
           // We have to parse the privilege first, to avoid passing 
no-privilege data to processor
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index ce491b92ef4..3e1f4b476ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -170,4 +170,8 @@ public abstract class PipeInsertionEvent extends 
EnrichedEvent {
     this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
     this.treeModelDatabaseName = 
PathUtils.qualifyDatabaseName(tableModelDatabaseName);
   }
+
+  public boolean shouldParse4Privilege() {
+    return shouldParse4Privilege;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d3d601d5754..6b3e505d2ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -404,10 +404,6 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     }
   }
 
-  public boolean shouldParse4Privilege() {
-    return shouldParse4Privilege;
-  }
-
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 9a827360f70..408be43ade6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -846,7 +846,9 @@ public class TsFileResource implements PersistentResource, 
Cloneable {
     // To release the memory occupied by pipe if held by it
     // Note that pipe can safely handle the case that the time index does not 
exist
     isEmpty();
-    degradeTimeIndex();
+    if (getStatus() != TsFileResourceStatus.UNCLOSED) {
+      degradeTimeIndex();
+    }
     try {
       fsFactory.deleteIfExists(file);
       fsFactory.deleteIfExists(

Reply via email to