This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 623a9c3fe3a Pipe: Fixed the bug that privilege is not filtered for
real time events && Fixed the bug that the unclosed file may be degraded when
directly deleted && Fixed the skip-if problem of the write-back-sink && Fixed
the problem of tsFile privilege parsing (#17024)
623a9c3fe3a is described below
commit 623a9c3fe3a3a1e84ddd340280b6eff3b5a236c1
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 14 21:44:18 2026 +0800
Pipe: Fixed the bug that privilege is not filtered for real time events &&
Fixed the bug that the unclosed file may be degraded when directly deleted &&
Fixed the skip-if problem of the write-back-sink && Fixed the problem of tsFile
privilege parsing (#17024)
* fix
* wb
* reorder
* fix
* IT
* fix
* fldz
* gr
* fix
* 1c1e
* register-more
---
.../treemodel/manual/IoTDBPipePermissionIT.java | 36 ++++++++++++---
.../pipe/it/single/IoTDBPipePermissionIT.java | 51 ++++++++++++++++++++++
.../subtask/processor/PipeProcessorSubtask.java | 21 ++++++++-
.../db/pipe/event/common/PipeInsertionEvent.java | 4 ++
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 --
.../scan/TsFileInsertionEventScanParser.java | 3 +-
.../table/TsFileInsertionEventTableParser.java | 16 ++++++-
.../sink/protocol/writeback/WriteBackSink.java | 14 +++++-
.../dataregion/tsfile/TsFileResource.java | 6 ++-
9 files changed, 136 insertions(+), 19 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index b0f2feffe42..7f35bb1743f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeManual;
-import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
@@ -364,6 +363,27 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count databases root.test1", "count,",
Collections.singleton("1,"));
+ // Write some data
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create timeSeries root.vehicle.car.temperature DOUBLE",
+ "insert into root.vehicle.car(temperature) values (36.5)"));
+
+ // Exception, skip
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv, "count timeSeries", "count(timeseries),",
Collections.singleton("0,"));
+
+ // Provide time series
+ TestUtils.executeNonQuery(receiverEnv, "create timeSeries
root.vehicle.car.temperature DOUBLE");
+
+ // Exception, skip
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "select count(temperature) from root.vehicle.car",
+ "count(root.vehicle.car.pressure),",
+ Collections.singleton("0,"));
+
// Alter pipe, throw exception if no privileges
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
@@ -377,13 +397,15 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
- "create timeSeries root.vehicle.car.temperature DOUBLE",
- "insert into root.vehicle.car(temperature) values (36.5)"));
+ "create timeSeries root.vehicle.car.pressure DOUBLE",
+ "insert into root.vehicle.car(pressure) values (36.5)"));
// Exception, block here
- TableModelUtils.assertCountDataAlwaysOnEnv("test", "test", 0, receiverEnv);
TestUtils.assertDataAlwaysOnEnv(
- receiverEnv, "count timeSeries", "count(timeseries),",
Collections.singleton("0,"));
+ receiverEnv,
+ "select count(pressure) from root.vehicle.car",
+ "count(root.vehicle.car.pressure),",
+ Collections.singleton("0,"));
// Grant SELECT privilege
TestUtils.executeNonQueries(
@@ -392,8 +414,8 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
// Will finally pass
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
- "select count(*) from root.vehicle.**",
- "count(root.vehicle.car.temperature),",
+ "select count(pressure) from root.vehicle.car",
+ "count(root.vehicle.car.pressure),",
Collections.singleton("1,"));
// test showing pipe
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
index 6779adabf84..c7fa9d12c4e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
@@ -20,12 +20,14 @@
package org.apache.iotdb.pipe.it.single;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -41,6 +43,23 @@ import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipePermissionIT extends AbstractPipeSingleIT {
+
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(1);
+ env = MultiEnvFactory.getEnv(0);
+ env.getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setPipeMemoryManagementEnabled(false)
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
+ // 1C1D to directly show the remaining count
+ env.initClusterEnvironment(1, 1);
+ }
+
@Test
public void testSinkPermission() {
TestUtils.executeNonQuery(env, "create user `thulab` 'StrngPsWd@623451'",
null);
@@ -166,6 +185,38 @@ public class IoTDBPipePermissionIT extends
AbstractPipeSingleIT {
} catch (Exception e) {
fail(e.getMessage());
}
+
+ // Tree model
+ try (final Connection connection = env.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE root.test_sink");
+ statement.execute(
+ "CREATE ALIGNED TIMESERIES root.test_sink.d1(s_int INT32,s_long
INT64,s_float float,s_double double)");
+ statement.execute(
+ "INSERT INTO root.test_sink.d1(time,s_int,s_long,s_float,s_double)
VALUES (2025-01-01 03:50:14,1,1,1,1),(2025-01-01
03:50:30,12,14,16.24,18.5),(2025-01-01 03:51:45,22,24,26.24,28.5),(2025-01-01
03:51:59,32,34,36.46,38.6),(2025-01-01 03:52:00,10,14,16.46,18.6)");
+ statement.execute("CREATE USER user_new 'paSs1234@56789'");
+ // Do not grant SYSTEM privilege to avoid auto creation
+ statement.execute("GRANT write ON root.** TO USER user_new");
+ statement.execute(
+ "create pipe user_sink_pipe with source
('pattern'='root.test_sink.d1') with processor
('processor'='aggregate-processor', 'output.database'='root.user_sink_db',
'operators'='avg') with sink
('sink'='write-back-sink','user'='user_new','password'='paSs1234@56789')");
+
+ final long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime <= 20_000L) {
+ try (final ResultSet set = statement.executeQuery("show pipe
user_sink_pipe")) {
+ Assert.assertTrue(set.next());
+ try {
+ Assert.assertEquals("0", set.getString(8));
+ return;
+ } catch (final Throwable t) {
+ // Retry
+ }
+ }
+ }
+ Assert.fail();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
}
@Test
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index d8fbac9e44e..b481117c5e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
@@ -143,7 +144,25 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
// event can be supplied after the subtask is closed, so we need to
check isClosed here
if (!isClosed.get()) {
if (event instanceof TabletInsertionEvent) {
- pipeProcessor.process((TabletInsertionEvent) event,
outputEventCollector);
+ if (event instanceof PipeInsertNodeTabletInsertionEvent
+ && ((PipeInsertNodeTabletInsertionEvent)
event).shouldParse4Privilege()) {
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ ((PipeInsertNodeTabletInsertionEvent) event)
+ .toRawTabletInsertionEvents()
+ .forEach(
+ rawTabletInsertionEvent -> {
+ try {
+ pipeProcessor.process(rawTabletInsertionEvent,
outputEventCollector);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ });
+ if (ex.get() != null) {
+ throw ex.get();
+ }
+ } else {
+ pipeProcessor.process((TabletInsertionEvent) event,
outputEventCollector);
+ }
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
} else if (event instanceof TsFileInsertionEvent) {
// We have to parse the privilege first, to avoid passing
no-privilege data to processor
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index ce491b92ef4..3e1f4b476ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -170,4 +170,8 @@ public abstract class PipeInsertionEvent extends
EnrichedEvent {
this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
this.treeModelDatabaseName =
PathUtils.qualifyDatabaseName(tableModelDatabaseName);
}
+
+ public boolean shouldParse4Privilege() {
+ return shouldParse4Privilege;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d3d601d5754..6b3e505d2ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -404,10 +404,6 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
}
}
- public boolean shouldParse4Privilege() {
- return shouldParse4Privilege;
- }
-
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 2c1cc27b80a..8590fe2002e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -539,7 +539,8 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
PrivilegeType.READ_DATA);
if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (skipIfNoPrivileges) {
- continue;
+ tsFileSequenceReader.position(nextMarkerOffset);
+ break;
}
throw new AccessDeniedException(status.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 8ac2d4e9b71..e353163e726 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
import org.apache.iotdb.commons.audit.IAuditEntity;
+import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
@@ -182,7 +183,7 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
}
private boolean hasTablePrivilege(final String tableName) {
- return Objects.isNull(entity)
+ if (Objects.isNull(entity)
|| Objects.isNull(sourceEvent)
||
Objects.isNull(sourceEvent.getTableModelDatabaseName())
|| AuthorityChecker.getAccessControl()
@@ -190,7 +191,18 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
entity.getUsername(),
new QualifiedObjectName(
sourceEvent.getTableModelDatabaseName(),
tableName),
- entity);
+ entity)) {
+ return true;
+ }
+ if (!skipIfNoPrivileges) {
+ throw new AccessDeniedException(
+ String.format(
+ "No privilege for SELECT for user %s at table
%s.%s",
+ entity.getUsername(),
+ sourceEvent.getTableModelDatabaseName(),
+ tableName));
+ }
+ return false;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 612d41d7f47..767d84a7da6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.writeback;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.StatusUtils;
@@ -266,7 +267,9 @@ public class WriteBackSink implements PipeConnector {
insertBaseStatement,
pipeInsertNodeTabletInsertionEvent.getUserName());
if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
- && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && !(skipIfNoPrivileges
+ && status.getCode() ==
TSStatusCode.NO_PERMISSION.getStatusCode())) {
throw new PipeException(
String.format(
"Write back PipeInsertNodeTabletInsertionEvent %s error, result
status %s",
@@ -353,7 +356,9 @@ public class WriteBackSink implements PipeConnector {
pipeStatementInsertionEvent.getUserName());
if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
- && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && !(skipIfNoPrivileges
+ && status.getCode() ==
TSStatusCode.NO_PERMISSION.getStatusCode())) {
throw new PipeException(
String.format(
"Write back PipeStatementInsertionEvent %s error, result status
%s",
@@ -509,6 +514,11 @@ public class WriteBackSink implements PipeConnector {
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
false)
.status;
+ } catch (final IoTDBRuntimeException e) {
+ if (e.getErrorCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) {
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ }
+ throw e;
} finally {
if (useEventUserName) {
session.setUsername(originalUserName);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 9a827360f70..b84cce9e8d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -842,11 +842,13 @@ public class TsFileResource implements
PersistentResource, Cloneable {
* file physically.
*/
public boolean remove() {
- forceMarkDeleted();
// To release the memory occupied by pipe if held by it
// Note that pipe can safely handle the case that the time index does not
exist
isEmpty();
- degradeTimeIndex();
+ if (getStatus() != TsFileResourceStatus.UNCLOSED) {
+ degradeTimeIndex();
+ }
+ forceMarkDeleted();
try {
fsFactory.deleteIfExists(file);
fsFactory.deleteIfExists(