This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 623a9c3fe3a Pipe: Fixed the bug that privilege is not filtered for 
real time events && Fixed the bug that the unclosed file may be degraded when 
directly deleted && Fixed the skip-if problem of the write-back-sink && Fixed 
the problem of tsFile privilege parsing (#17024)
623a9c3fe3a is described below

commit 623a9c3fe3a3a1e84ddd340280b6eff3b5a236c1
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 14 21:44:18 2026 +0800

    Pipe: Fixed the bug that privilege is not filtered for real time events && 
Fixed the bug that the unclosed file may be degraded when directly deleted && 
Fixed the skip-if problem of the write-back-sink && Fixed the problem of tsFile 
privilege parsing (#17024)
    
    * fix
    
    * wb
    
    * reorder
    
    * fix
    
    * IT
    
    * fix
    
    * fldz
    
    * gr
    
    * fix
    
    * 1c1e
    
    * register-more
---
 .../treemodel/manual/IoTDBPipePermissionIT.java    | 36 ++++++++++++---
 .../pipe/it/single/IoTDBPipePermissionIT.java      | 51 ++++++++++++++++++++++
 .../subtask/processor/PipeProcessorSubtask.java    | 21 ++++++++-
 .../db/pipe/event/common/PipeInsertionEvent.java   |  4 ++
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 --
 .../scan/TsFileInsertionEventScanParser.java       |  3 +-
 .../table/TsFileInsertionEventTableParser.java     | 16 ++++++-
 .../sink/protocol/writeback/WriteBackSink.java     | 14 +++++-
 .../dataregion/tsfile/TsFileResource.java          |  6 ++-
 9 files changed, 136 insertions(+), 19 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index b0f2feffe42..7f35bb1743f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeManual;
-import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
@@ -364,6 +363,27 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv, "count databases root.test1", "count,", 
Collections.singleton("1,"));
 
+    // Write some data
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "create timeSeries root.vehicle.car.temperature DOUBLE",
+            "insert into root.vehicle.car(temperature) values (36.5)"));
+
+    // Exception, skip
+    TestUtils.assertDataAlwaysOnEnv(
+        receiverEnv, "count timeSeries", "count(timeseries),", 
Collections.singleton("0,"));
+
+    // Provide time series
+    TestUtils.executeNonQuery(receiverEnv, "create timeSeries 
root.vehicle.car.temperature DOUBLE");
+
+    // Exception, skip
+    TestUtils.assertDataAlwaysOnEnv(
+        receiverEnv,
+        "select count(temperature) from root.vehicle.car",
+        "count(root.vehicle.car.pressure),",
+        Collections.singleton("0,"));
+
     // Alter pipe, throw exception if no privileges
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
@@ -377,13 +397,15 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
     TestUtils.executeNonQueries(
         senderEnv,
         Arrays.asList(
-            "create timeSeries root.vehicle.car.temperature DOUBLE",
-            "insert into root.vehicle.car(temperature) values (36.5)"));
+            "create timeSeries root.vehicle.car.pressure DOUBLE",
+            "insert into root.vehicle.car(pressure) values (36.5)"));
 
     // Exception, block here
-    TableModelUtils.assertCountDataAlwaysOnEnv("test", "test", 0, receiverEnv);
     TestUtils.assertDataAlwaysOnEnv(
-        receiverEnv, "count timeSeries", "count(timeseries),", 
Collections.singleton("0,"));
+        receiverEnv,
+        "select count(pressure) from root.vehicle.car",
+        "count(root.vehicle.car.pressure),",
+        Collections.singleton("0,"));
 
     // Grant SELECT privilege
     TestUtils.executeNonQueries(
@@ -392,8 +414,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
     // Will finally pass
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv,
-        "select count(*) from root.vehicle.**",
-        "count(root.vehicle.car.temperature),",
+        "select count(pressure) from root.vehicle.car",
+        "count(root.vehicle.car.pressure),",
         Collections.singleton("1,"));
 
     // test showing pipe
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
index 6779adabf84..c7fa9d12c4e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
@@ -20,12 +20,14 @@
 package org.apache.iotdb.pipe.it.single;
 
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT1;
 import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -41,6 +43,23 @@ import static org.junit.Assert.fail;
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT1.class})
 public class IoTDBPipePermissionIT extends AbstractPipeSingleIT {
+
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(1);
+    env = MultiEnvFactory.getEnv(0);
+    env.getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setPipeMemoryManagementEnabled(false)
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
+    // 1C1D to directly show the remaining count
+    env.initClusterEnvironment(1, 1);
+  }
+
   @Test
   public void testSinkPermission() {
     TestUtils.executeNonQuery(env, "create user `thulab` 'StrngPsWd@623451'", 
null);
@@ -166,6 +185,38 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeSingleIT {
     } catch (Exception e) {
       fail(e.getMessage());
     }
+
+    // Tree model
+    try (final Connection connection = env.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE root.test_sink");
+      statement.execute(
+          "CREATE ALIGNED TIMESERIES root.test_sink.d1(s_int INT32,s_long 
INT64,s_float float,s_double double)");
+      statement.execute(
+          "INSERT INTO root.test_sink.d1(time,s_int,s_long,s_float,s_double) 
VALUES (2025-01-01 03:50:14,1,1,1,1),(2025-01-01 
03:50:30,12,14,16.24,18.5),(2025-01-01 03:51:45,22,24,26.24,28.5),(2025-01-01 
03:51:59,32,34,36.46,38.6),(2025-01-01 03:52:00,10,14,16.46,18.6)");
+      statement.execute("CREATE USER user_new 'paSs1234@56789'");
+      // Do not grant SYSTEM privilege to avoid auto creation
+      statement.execute("GRANT write ON root.** TO USER user_new");
+      statement.execute(
+          "create pipe user_sink_pipe with source 
('pattern'='root.test_sink.d1') with processor 
('processor'='aggregate-processor', 'output.database'='root.user_sink_db', 
'operators'='avg') with sink 
('sink'='write-back-sink','user'='user_new','password'='paSs1234@56789')");
+
+      final long startTime = System.currentTimeMillis();
+      while (System.currentTimeMillis() - startTime <= 20_000L) {
+        try (final ResultSet set = statement.executeQuery("show pipe 
user_sink_pipe")) {
+          Assert.assertTrue(set.next());
+          try {
+            Assert.assertEquals("0", set.getString(8));
+            return;
+          } catch (final Throwable t) {
+            // Retry
+          }
+        }
+      }
+      Assert.fail();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index d8fbac9e44e..b481117c5e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
@@ -143,7 +144,25 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       // event can be supplied after the subtask is closed, so we need to 
check isClosed here
       if (!isClosed.get()) {
         if (event instanceof TabletInsertionEvent) {
-          pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
+          if (event instanceof PipeInsertNodeTabletInsertionEvent
+              && ((PipeInsertNodeTabletInsertionEvent) 
event).shouldParse4Privilege()) {
+            final AtomicReference<Exception> ex = new AtomicReference<>();
+            ((PipeInsertNodeTabletInsertionEvent) event)
+                .toRawTabletInsertionEvents()
+                .forEach(
+                    rawTabletInsertionEvent -> {
+                      try {
+                        pipeProcessor.process(rawTabletInsertionEvent, 
outputEventCollector);
+                      } catch (Exception e) {
+                        ex.set(e);
+                      }
+                    });
+            if (ex.get() != null) {
+              throw ex.get();
+            }
+          } else {
+            pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
+          }
           PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
         } else if (event instanceof TsFileInsertionEvent) {
           // We have to parse the privilege first, to avoid passing 
no-privilege data to processor
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index ce491b92ef4..3e1f4b476ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -170,4 +170,8 @@ public abstract class PipeInsertionEvent extends 
EnrichedEvent {
     this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
     this.treeModelDatabaseName = 
PathUtils.qualifyDatabaseName(tableModelDatabaseName);
   }
+
+  public boolean shouldParse4Privilege() {
+    return shouldParse4Privilege;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d3d601d5754..6b3e505d2ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -404,10 +404,6 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     }
   }
 
-  public boolean shouldParse4Privilege() {
-    return shouldParse4Privilege;
-  }
-
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 2c1cc27b80a..8590fe2002e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -539,7 +539,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                           PrivilegeType.READ_DATA);
               if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                 if (skipIfNoPrivileges) {
-                  continue;
+                  tsFileSequenceReader.position(nextMarkerOffset);
+                  break;
                 }
                 throw new AccessDeniedException(status.getMessage());
               }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 8ac2d4e9b71..e353163e726 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
 
 import org.apache.iotdb.commons.audit.IAuditEntity;
+import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
@@ -182,7 +183,7 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
                 }
 
                 private boolean hasTablePrivilege(final String tableName) {
-                  return Objects.isNull(entity)
+                  if (Objects.isNull(entity)
                       || Objects.isNull(sourceEvent)
                       || 
Objects.isNull(sourceEvent.getTableModelDatabaseName())
                       || AuthorityChecker.getAccessControl()
@@ -190,7 +191,18 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
                               entity.getUsername(),
                               new QualifiedObjectName(
                                   sourceEvent.getTableModelDatabaseName(), 
tableName),
-                              entity);
+                              entity)) {
+                    return true;
+                  }
+                  if (!skipIfNoPrivileges) {
+                    throw new AccessDeniedException(
+                        String.format(
+                            "No privilege for SELECT for user %s at table 
%s.%s",
+                            entity.getUsername(),
+                            sourceEvent.getTableModelDatabaseName(),
+                            tableName));
+                  }
+                  return false;
                 }
 
                 @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 612d41d7f47..767d84a7da6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.writeback;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -266,7 +267,9 @@ public class WriteBackSink implements PipeConnector {
                 insertBaseStatement, 
pipeInsertNodeTabletInsertionEvent.getUserName());
 
     if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
-        && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && !(skipIfNoPrivileges
+            && status.getCode() == 
TSStatusCode.NO_PERMISSION.getStatusCode())) {
       throw new PipeException(
           String.format(
               "Write back PipeInsertNodeTabletInsertionEvent %s error, result 
status %s",
@@ -353,7 +356,9 @@ public class WriteBackSink implements PipeConnector {
                 pipeStatementInsertionEvent.getUserName());
 
     if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
-        && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && !(skipIfNoPrivileges
+            && status.getCode() == 
TSStatusCode.NO_PERMISSION.getStatusCode())) {
       throw new PipeException(
           String.format(
               "Write back PipeStatementInsertionEvent %s error, result status 
%s",
@@ -509,6 +514,11 @@ public class WriteBackSink implements PipeConnector {
               
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
               false)
           .status;
+    } catch (final IoTDBRuntimeException e) {
+      if (e.getErrorCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) {
+        return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+      }
+      throw e;
     } finally {
       if (useEventUserName) {
         session.setUsername(originalUserName);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 9a827360f70..b84cce9e8d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -842,11 +842,13 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
    * file physically.
    */
   public boolean remove() {
-    forceMarkDeleted();
     // To release the memory occupied by pipe if held by it
     // Note that pipe can safely handle the case that the time index does not 
exist
     isEmpty();
-    degradeTimeIndex();
+    if (getStatus() != TsFileResourceStatus.UNCLOSED) {
+      degradeTimeIndex();
+    }
+    forceMarkDeleted();
     try {
       fsFactory.deleteIfExists(file);
       fsFactory.deleteIfExists(

Reply via email to