This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 73a0cd8d399 No filtering audit DBs in some procdure & update idle time
after logging in
73a0cd8d399 is described below
commit 73a0cd8d3990812c4b6564fc65b729d37de3d5ca
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Oct 10 14:37:04 2025 +0800
No filtering audit DBs in some procdure & update idle time after logging in
---
.../relational/it/db/it/IoTDBDeletionTableIT.java | 4 +-
.../relational/it/mqtt/IoTDBMQTTServiceIT.java | 85 +++++++++++++++-------
.../iotdb/confignode/manager/ConfigManager.java | 24 +++++-
.../iotdb/confignode/manager/ProcedureManager.java | 8 +-
.../impl/schema/DeleteTimeSeriesProcedure.java | 13 +++-
.../pipe/receiver/PipeEnrichedProcedureTest.java | 2 +-
.../impl/schema/DeleteTimeSeriesProcedureTest.java | 2 +-
.../iotdb/db/protocol/session/SessionManager.java | 1 +
.../config/executor/ClusterConfigTaskExecutor.java | 1 +
.../metadata/DeleteTimeSeriesStatement.java | 9 +++
.../apache/iotdb/db/utils/DataNodeAuthUtils.java | 1 +
.../src/main/thrift/confignode.thrift | 1 +
12 files changed, 111 insertions(+), 40 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java
index 699c0d636e1..316ae614681 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java
@@ -1625,8 +1625,8 @@ public class IoTDBDeletionTableIT {
allDeviceUndeletedRanges.set(i,
mergeRanges(deviceUndeletedRanges));
List<TimeRange> remainingRanges =
collectDataRanges(statement, currentWrittenTime, testNum);
- LOGGER.debug("Expected ranges: {}", deviceUndeletedRanges);
- LOGGER.debug("Remaining ranges: {}", remainingRanges);
+ LOGGER.info("Expected ranges: {}", deviceUndeletedRanges);
+ LOGGER.info("Remaining ranges: {}", remainingRanges);
fail(
String.format(
"Inconsistent number of points %d - %d", expectedCnt,
set.getLong(1)));
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
index 118554d8aad..afc202ae68d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
@@ -25,8 +25,10 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.tsfile.read.common.Field;
+import org.awaitility.Awaitility;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
@@ -38,6 +40,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -90,19 +93,32 @@ public class IoTDBMQTTServiceIT {
EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
String payload1 = "test1,tag1=t1,tag2=t2 field1=1,field2=1f,field3=1i32
1";
- connection.publish(DATABASE + "/myTopic", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
- Thread.sleep(1000);
- try (final SessionDataSet dataSet =
- session.executeQueryStatement(
- "select tag1,tag2,field1,field2,field3 from test1 where time =
1")) {
- assertEquals(5, dataSet.getColumnNames().size());
- List<Field> fields = dataSet.next().getFields();
- assertEquals("t1", fields.get(0).getStringValue());
- assertEquals("t2", fields.get(1).getStringValue());
- assertEquals(1d, fields.get(2).getDoubleV(), 0);
- assertEquals(1f, fields.get(3).getFloatV(), 0);
- assertEquals(1, fields.get(4).getIntV(), 0);
- }
+ Awaitility.await()
+ .atMost(3, TimeUnit.MINUTES)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ connection.publish(
+ DATABASE + "/myTopic", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ try (final SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select tag1,tag2,field1,field2,field3 from test1
where time = 1")) {
+ assertEquals(5, dataSet.getColumnNames().size());
+ List<Field> fields = dataSet.next().getFields();
+ assertEquals("t1", fields.get(0).getStringValue());
+ assertEquals("t2", fields.get(1).getStringValue());
+ assertEquals(1d, fields.get(2).getDoubleV(), 0);
+ assertEquals(1f, fields.get(3).getFloatV(), 0);
+ assertEquals(1, fields.get(4).getIntV(), 0);
+ return true;
+ } catch (StatementExecutionException e) {
+ if (e.getMessage() != null && e.getMessage().contains("does
not exist")) {
+ return false;
+ } else {
+ throw e;
+ }
+ }
+ });
}
}
@@ -112,21 +128,34 @@ public class IoTDBMQTTServiceIT {
EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
String payload1 = "test2,tag1=t1,tag2=t2 attr3=a3,attr4=a4
field1=1,field2=1f,field3=1i32 1";
- connection.publish(DATABASE + "/myTopic", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
- Thread.sleep(1000);
- try (final SessionDataSet dataSet =
- session.executeQueryStatement(
- "select tag1,tag2,attr3,attr4,field1,field2,field3 from test2
where time = 1")) {
- assertEquals(7, dataSet.getColumnNames().size());
- List<Field> fields = dataSet.next().getFields();
- assertEquals("t1", fields.get(0).getStringValue());
- assertEquals("t2", fields.get(1).getStringValue());
- assertEquals("a3", fields.get(2).getStringValue());
- assertEquals("a4", fields.get(3).getStringValue());
- assertEquals(1d, fields.get(4).getDoubleV(), 0);
- assertEquals(1f, fields.get(5).getFloatV(), 0);
- assertEquals(1, fields.get(6).getIntV(), 0);
- }
+ Awaitility.await()
+ .atMost(3, TimeUnit.MINUTES)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ connection.publish(
+ DATABASE + "/myTopic", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ try (final SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select tag1,tag2,attr3,attr4,field1,field2,field3
from test2 where time = 1")) {
+ assertEquals(7, dataSet.getColumnNames().size());
+ List<Field> fields = dataSet.next().getFields();
+ assertEquals("t1", fields.get(0).getStringValue());
+ assertEquals("t2", fields.get(1).getStringValue());
+ assertEquals("a3", fields.get(2).getStringValue());
+ assertEquals("a4", fields.get(3).getStringValue());
+ assertEquals(1d, fields.get(4).getDoubleV(), 0);
+ assertEquals(1f, fields.get(5).getFloatV(), 0);
+ assertEquals(1, fields.get(6).getIntV(), 0);
+ return true;
+ } catch (StatementExecutionException e) {
+ if (e.getMessage() != null && e.getMessage().contains("does
not exist")) {
+ return false;
+ } else {
+ throw e;
+ }
+ }
+ });
}
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 696bf25be6b..233c3be881b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -2269,7 +2269,11 @@ public class ConfigManager implements IManager {
deleteTimeSeriesPatternPaths.add(path);
}
if (!canOptimize) {
- return procedureManager.deleteTimeSeries(queryId, rawPatternTree,
isGeneratedByPipe);
+ return procedureManager.deleteTimeSeries(
+ queryId,
+ rawPatternTree,
+ isGeneratedByPipe,
+ req.isSetMayDeleteAudit() && req.isMayDeleteAudit());
}
// check if the database is using template
try {
@@ -2286,7 +2290,10 @@ public class ConfigManager implements IManager {
deleteTimeSeriesPatternTree.constructTree();
status =
procedureManager.deleteTimeSeries(
- queryId, deleteTimeSeriesPatternTree, isGeneratedByPipe);
+ queryId,
+ deleteTimeSeriesPatternTree,
+ isGeneratedByPipe,
+ req.isSetMayDeleteAudit() && req.isMayDeleteAudit());
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// 2. delete database
@@ -2638,12 +2645,21 @@ public class ConfigManager implements IManager {
/**
* Get all related dataRegion which may contains the data of specific
timeseries matched by given
- * patternTree
+ * patternTree. The audit db is excluded
*/
public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
final PathPatternTree patternTree) {
+ return getRelatedDataRegionGroup(patternTree, false);
+ }
+
+ /**
+ * Get all related dataRegion which may contains the data of specific
timeseries matched by given
+ * patternTree
+ */
+ public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
+ final PathPatternTree patternTree, boolean needAuditDB) {
return getRelatedDataRegionGroup(
- getSchemaPartition(patternTree, false).getSchemaPartitionTable());
+ getSchemaPartition(patternTree,
needAuditDB).getSchemaPartitionTable());
}
public Map<TConsensusGroupId, TRegionReplicaSet>
getRelatedDataRegionGroup4TableModel(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 913c5dbd306..3dd3a2c68ff 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -308,7 +308,10 @@ public class ProcedureManager {
}
public TSStatus deleteTimeSeries(
- String queryId, PathPatternTree patternTree, boolean isGeneratedByPipe) {
+ String queryId,
+ PathPatternTree patternTree,
+ boolean isGeneratedByPipe,
+ boolean mayDeleteAudit) {
DeleteTimeSeriesProcedure procedure = null;
synchronized (this) {
boolean hasOverlappedTask = false;
@@ -336,7 +339,8 @@ public class ProcedureManager {
TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
"Some other task is deleting some target timeseries.");
}
- procedure = new DeleteTimeSeriesProcedure(queryId, patternTree,
isGeneratedByPipe);
+ procedure =
+ new DeleteTimeSeriesProcedure(queryId, patternTree,
isGeneratedByPipe, mayDeleteAudit);
this.executor.submitProcedure(procedure);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
index b953e8f1df0..b143407fda9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java
@@ -71,6 +71,7 @@ public class DeleteTimeSeriesProcedure
private PathPatternTree patternTree;
private transient ByteBuffer patternTreeBytes;
+ private boolean mayDeleteAudit;
private transient String requestMessage;
@@ -86,10 +87,14 @@ public class DeleteTimeSeriesProcedure
}
public DeleteTimeSeriesProcedure(
- final String queryId, final PathPatternTree patternTree, final boolean
isGeneratedByPipe) {
+ final String queryId,
+ final PathPatternTree patternTree,
+ final boolean isGeneratedByPipe,
+ boolean mayDeleteAudit) {
super(isGeneratedByPipe);
this.queryId = queryId;
setPatternTree(patternTree);
+ this.mayDeleteAudit = mayDeleteAudit;
}
@Override
@@ -233,7 +238,7 @@ public class DeleteTimeSeriesProcedure
}
final Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup =
- env.getConfigManager().getRelatedDataRegionGroup(patternTree);
+ env.getConfigManager().getRelatedDataRegionGroup(patternTree,
mayDeleteAudit);
// Target timeSeries has no data
if (relatedDataRegionGroup.isEmpty()) {
@@ -359,6 +364,7 @@ public class DeleteTimeSeriesProcedure
super.serialize(stream);
ReadWriteIOUtils.write(queryId, stream);
patternTree.serialize(stream);
+ ReadWriteIOUtils.write(mayDeleteAudit, stream);
}
@Override
@@ -370,6 +376,9 @@ public class DeleteTimeSeriesProcedure
|| getCurrentState() == DeleteTimeSeriesState.DELETE_DATA) {
LOGGER.info("Successfully restored, will set mods to the data regions
anyway");
}
+ if (byteBuffer.hasRemaining()) {
+ mayDeleteAudit = ReadWriteIOUtils.readBoolean(byteBuffer);
+ }
}
@Override
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/receiver/PipeEnrichedProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/receiver/PipeEnrichedProcedureTest.java
index 53d025d6b2f..9a3054cacda 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/receiver/PipeEnrichedProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/receiver/PipeEnrichedProcedureTest.java
@@ -117,7 +117,7 @@ public class PipeEnrichedProcedureTest {
patternTree.appendPathPattern(new PartialPath("root.sg2.*.s1"));
patternTree.constructTree();
DeleteTimeSeriesProcedure deleteTimeSeriesProcedure =
- new DeleteTimeSeriesProcedure(queryId, patternTree, true);
+ new DeleteTimeSeriesProcedure(queryId, patternTree, true, false);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedureTest.java
index 27cc029febf..8223cac998e 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedureTest.java
@@ -43,7 +43,7 @@ public class DeleteTimeSeriesProcedureTest {
patternTree.appendPathPattern(new PartialPath("root.sg2.*.s1"));
patternTree.constructTree();
DeleteTimeSeriesProcedure deleteTimeSeriesProcedure =
- new DeleteTimeSeriesProcedure(queryId, patternTree, false);
+ new DeleteTimeSeriesProcedure(queryId, patternTree, false, false);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index e1851f62dc6..e2f88bb8076 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -223,6 +223,7 @@ public class SessionManager implements SessionManagerMBean {
openSessionResp.getMessage(),
username,
session);
+ updateIdleTime();
if (enableLoginLock) {
loginLockManager.clearFailure(userId, session.getClientAddress());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 35eee66c090..71b6b303a10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2775,6 +2775,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new TDeleteTimeSeriesReq(
queryId,
serializePatternListToByteBuffer(deleteTimeSeriesStatement.getPathPatternList()));
+ req.setMayDeleteAudit(deleteTimeSeriesStatement.isMayDeleteAudit());
try (ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
TSStatus tsStatus;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DeleteTimeSeriesStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DeleteTimeSeriesStatement.java
index f8428d4e026..d51aee564a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DeleteTimeSeriesStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DeleteTimeSeriesStatement.java
@@ -30,6 +30,7 @@ import java.util.List;
public class DeleteTimeSeriesStatement extends Statement implements
IConfigStatement {
+ private boolean mayDeleteAudit = false;
List<PartialPath> pathPatternList;
public DeleteTimeSeriesStatement() {
@@ -64,4 +65,12 @@ public class DeleteTimeSeriesStatement extends Statement
implements IConfigState
public QueryType getQueryType() {
return QueryType.WRITE;
}
+
+ public void setMayDeleteAudit(boolean mayDeleteAudit) {
+ this.mayDeleteAudit = mayDeleteAudit;
+ }
+
+ public boolean isMayDeleteAudit() {
+ return mayDeleteAudit;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
index 3a96f72f033..656527a097d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
@@ -216,6 +216,7 @@ public class DataNodeAuthUtils {
public static TSStatus deletePasswordHistory(long userId) {
DeleteTimeSeriesStatement deleteTimeSeriesStatement = new
DeleteTimeSeriesStatement();
+ deleteTimeSeriesStatement.setMayDeleteAudit(true);
try {
PartialPath devicePath =
new PartialPath(DNAuditLogger.PREFIX_PASSWORD_HISTORY + ".`_" +
userId + "`");
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 5ecf2af1d35..e209200e048 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -922,6 +922,7 @@ struct TDeleteTimeSeriesReq {
1: required string queryId
2: required binary pathPatternTree
3: optional bool isGeneratedByPipe
+ 4: optional bool mayDeleteAudit
}
struct TDeleteLogicalViewReq {