This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rc/2.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/2.0.1 by this push:
new cd74e6dee9c Implement schema quota check at schemaRegion for table
device
cd74e6dee9c is described below
commit cd74e6dee9c963cd9d7e81f4e5852270318ff5b9
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 2 03:56:13 2024 +0800
Implement schema quota check at schemaRegion for table device
---
.../execution/executor/RegionWriteExecutor.java | 30 ++++++++++++++++++++++
.../schemaengine/schemaregion/ISchemaRegion.java | 10 ++++++++
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 18 +++++++++++--
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 7 ++++-
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 25 +++++++++++++++++-
5 files changed, 86 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 70fdbe5caeb..cbfe1682e19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -65,6 +65,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOf
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -985,6 +986,29 @@ public class RegionWriteExecutor {
// end of visitCreateLogicalView
}
+ @Override
+ public RegionExecutionResult visitCreateOrUpdateTableDevice(
+ final CreateOrUpdateTableDeviceNode node, final
WritePlanNodeExecutionContext context) {
+ return executeCreateOrUpdateTableDevice(node, context, false);
+ }
+
+ private RegionExecutionResult executeCreateOrUpdateTableDevice(
+ final CreateOrUpdateTableDeviceNode node,
+ final WritePlanNodeExecutionContext context,
+ final boolean receivedFromPipe) {
+ final ISchemaRegion schemaRegion =
+ schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
+ try {
+ schemaRegion.checkSchemaQuota(node.getTableName(),
node.getDeviceIdList());
+ } catch (final SchemaQuotaExceededException e) {
+ return RegionExecutionResult.create(
+ false, e.getMessage(), RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
+ }
+ return receivedFromPipe
+ ? super.visitPipeEnrichedWritePlanNode(new
PipeEnrichedWritePlanNode(node), context)
+ : super.visitCreateOrUpdateTableDevice(node, context);
+ }
+
@Override
public RegionExecutionResult visitPipeEnrichedWritePlanNode(
PipeEnrichedWritePlanNode node, WritePlanNodeExecutionContext context)
{
@@ -1066,6 +1090,12 @@ public class RegionWriteExecutor {
CreateLogicalViewNode node, WritePlanNodeExecutionContext context) {
return visitor.executeCreateLogicalView(node, context, true);
}
+
+ @Override
+ public RegionExecutionResult visitCreateOrUpdateTableDevice(
+ final CreateOrUpdateTableDeviceNode node, final
WritePlanNodeExecutionContext context) {
+ return visitor.executeCreateOrUpdateTableDevice(node, context, true);
+ }
}
private static class WritePlanNodeExecutionContext {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
index e096673f315..840a1630f1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
@@ -158,6 +158,16 @@ public interface ISchemaRegion {
void checkSchemaQuota(final PartialPath devicePath, final int timeSeriesNum)
throws SchemaQuotaExceededException;
+ /**
+ * Check whether table device can be created.
+ *
+ * @param tableName the table of the created device
+ * @param deviceIdList the id segments of the created device
+ * @throws SchemaQuotaExceededException if the number of time series or
devices exceeds the limit
+ */
+ void checkSchemaQuota(final String tableName, final List<Object[]>
deviceIdList)
+ throws SchemaQuotaExceededException;
+
/**
* Construct schema black list via setting matched time series to preDeleted.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 2b49792fbc3..26d75d28465 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -117,6 +117,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.ICreateLogic
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IDeleteLogicalViewPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IPreDeleteLogicalViewPlan;
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IRollbackPreDeleteLogicalViewPlan;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.SchemaUtils;
@@ -854,7 +855,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
@Override
- public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+ public void checkSchemaQuota(final PartialPath devicePath, final int
timeSeriesNum)
throws SchemaQuotaExceededException {
if (!mtree.checkDeviceNodeExists(devicePath)) {
schemaQuotaManager.check(timeSeriesNum, 1);
@@ -863,6 +864,19 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
}
+ @Override
+ public void checkSchemaQuota(final String tableName, final List<Object[]>
deviceIdList)
+ throws SchemaQuotaExceededException {
+ final int notExistNum = mtree.getTableDeviceNotExistNum(tableName,
deviceIdList);
+ schemaQuotaManager.check(
+ (long)
+ DataNodeTableCache.getInstance()
+ .getTable(storageGroupFullPath.substring(5), tableName)
+ .getMeasurementNum()
+ * notExistNum,
+ notExistNum);
+ }
+
@Override
public Pair<Long, Boolean> constructSchemaBlackList(final PathPatternTree
patternTree)
throws MetadataException {
@@ -1388,7 +1402,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
final List<String> attributeNameList = node.getAttributeNameList();
final Object[] attributeValueList = node.getAttributeValueList().get(i);
- mtree.createTableDevice(
+ mtree.createOrUpdateTableDevice(
tableName,
deviceId,
() -> deviceAttributeStore.createAttribute(attributeNameList,
attributeValueList),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 9db4dc5ab7f..5e09c9e4a1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -858,7 +858,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
}
@Override
- public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+ public void checkSchemaQuota(final PartialPath devicePath, final int
timeSeriesNum)
throws SchemaQuotaExceededException {
if (!mtree.checkDeviceNodeExists(devicePath)) {
schemaQuotaManager.check(timeSeriesNum, 1);
@@ -867,6 +867,11 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
}
}
+ @Override
+ public void checkSchemaQuota(final String tableName, final List<Object[]>
deviceIdList) {
+ throw new UnsupportedOperationException("TableModel does not support
PBTree yet.");
+ }
+
@Override
public Pair<Long, Boolean> constructSchemaBlackList(final PathPatternTree
patternTree)
throws MetadataException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index 9423aa9815a..d9a09bed98d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -98,6 +98,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1576,7 +1577,29 @@ public class MTreeBelowSGMemoryImpl {
// region table device management
- public void createTableDevice(
+ public int getTableDeviceNotExistNum(final String tableName, final
List<Object[]> deviceIdList) {
+ final IMemMNode tableNode = storageGroupMNode.getChild(tableName);
+ int notExistNum = deviceIdList.size();
+ if (tableNode == null) {
+ return notExistNum;
+ }
+ IMemMNode cur;
+ for (final Object[] deviceId : deviceIdList) {
+ cur = tableNode;
+ for (final Object device : deviceId) {
+ cur = cur.getChild(Objects.nonNull(device) ? device.toString() : null);
+ if (cur == null) {
+ break;
+ }
+ }
+ if (Objects.nonNull(cur)) {
+ notExistNum--;
+ }
+ }
+ return notExistNum;
+ }
+
+ public void createOrUpdateTableDevice(
final String tableName,
final String[] devicePath,
final IntSupplier attributePointerGetter,