This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 197890bba8 [To rel/1.1][IOTDB-5719] Move DataNode's checking of
SchemaQuota above the consensus layer (#9626)
197890bba8 is described below
commit 197890bba8148e44ba829ffe3587b5461396e184
Author: Chen YZ <[email protected]>
AuthorDate: Wed Apr 19 10:57:32 2023 +0800
[To rel/1.1][IOTDB-5719] Move DataNode's checking of SchemaQuota above the
consensus layer (#9626)
---
.../schemaregion/rocksdb/RSchemaRegion.java | 5 ++
.../metadata/tagSchemaRegion/TagSchemaRegion.java | 5 ++
.../iotdb/db/metadata/mtree/IMTreeBelowSG.java | 8 +++
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 25 ++++++-
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 21 +++++-
.../db/metadata/mtree/store/CachedMTreeStore.java | 4 +-
.../iotdb/db/metadata/mtree/store/IMTreeStore.java | 2 +-
.../db/metadata/mtree/store/MemMTreeStore.java | 4 +-
.../db/metadata/schemaregion/ISchemaRegion.java | 11 +++
.../schemaregion/SchemaRegionMemoryImpl.java | 18 +++--
.../schemaregion/SchemaRegionSchemaFileImpl.java | 17 ++++-
.../execution/executor/RegionWriteExecutor.java | 84 +++++++++++++++++++++-
.../write/InternalCreateMultiTimeSeriesNode.java | 1 +
13 files changed, 182 insertions(+), 23 deletions(-)
diff --git
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 7a30ea527e..96abaa8c32 100644
---
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -533,6 +533,11 @@ public class RSchemaRegion implements ISchemaRegion {
throw new UnsupportedOperationException();
}
+ @Override
+ public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum) {
+ throw new UnsupportedOperationException();
+ }
+
private void createEntityRecursively(String[] nodes, int start, int end,
boolean aligned)
throws RocksDBException, MetadataException, InterruptedException {
if (start <= end) {
diff --git
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 416ed5b547..e7eaca0516 100644
---
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -288,6 +288,11 @@ public class TagSchemaRegion implements ISchemaRegion {
throw new UnsupportedOperationException("checkMeasurementExistence");
}
+ @Override
+ public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum) {
+ throw new UnsupportedOperationException();
+ }
+
private void filterExistingMeasurements(
ICreateAlignedTimeSeriesPlan plan, Set<String> measurementSet) {
List<String> measurements = plan.getMeasurements();
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
index de0aa5b8e0..9f08141bb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
@@ -139,6 +139,14 @@ public interface IMTreeBelowSG {
*/
IMNode getDeviceNodeWithAutoCreating(PartialPath deviceId) throws
MetadataException;
+ /**
+ * Check if the device node exists
+ *
+ * @param deviceId full path of device
+ * @return true if the device node exists
+ */
+ boolean checkDeviceNodeExists(PartialPath deviceId);
+
/**
* Fetch all measurement path
*
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 1ce8393868..ae670621dc 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -61,7 +61,6 @@ import
org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
-import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -719,6 +718,28 @@ public class MTreeBelowSGCachedImpl implements
IMTreeBelowSG {
unPinPath(cur);
}
}
+
+ /**
+ * Check if the device node exists
+ *
+ * @param deviceId full path of device
+ * @return true if the device node exists
+ */
+ @Override
+ public boolean checkDeviceNodeExists(PartialPath deviceId) {
+ IMNode deviceMNode = null;
+ try {
+ deviceMNode = getNodeByPath(deviceId);
+ return deviceMNode.isEntity();
+ } catch (MetadataException e) {
+ return false;
+ } finally {
+ if (deviceMNode != null) {
+ unPinMNode(deviceMNode);
+ }
+ }
+ }
+
// endregion
// region Interfaces and Implementation for metadata info Query
@@ -841,8 +862,6 @@ public class MTreeBelowSGCachedImpl implements
IMTreeBelowSG {
throw new DifferentTemplateException(activatePath.getFullPath(),
template.getName());
}
}
- DataNodeSchemaQuotaManager.getInstance()
- .checkMeasurementLevel(template.getMeasurementNumber());
if (cur.isEntity()) {
entityMNode = cur.getAsEntityMNode();
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index f77debe0c9..26376b38f0 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -60,7 +60,6 @@ import
org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
-import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
@@ -601,6 +600,24 @@ public class MTreeBelowSGMemoryImpl implements
IMTreeBelowSG {
}
return cur;
}
+
+ /**
+ * Check if the device node exists
+ *
+ * @param deviceId full path of device
+ * @return true if the device node exists
+ */
+ @Override
+ public boolean checkDeviceNodeExists(PartialPath deviceId) {
+ IMNode deviceMNode = null;
+ try {
+ deviceMNode = getNodeByPath(deviceId);
+ return deviceMNode.isEntity();
+ } catch (MetadataException e) {
+ return false;
+ }
+ }
+
// endregion
// region Interfaces and Implementation for metadata info Query
@@ -711,8 +728,6 @@ public class MTreeBelowSGMemoryImpl implements
IMTreeBelowSG {
throw new DifferentTemplateException(activatePath.getFullPath(),
template.getName());
}
}
- DataNodeSchemaQuotaManager.getInstance()
- .checkMeasurementLevel(template.getMeasurementNumber());
if (cur.isEntity()) {
entityMNode = cur.getAsEntityMNode();
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
index 0a329b3cc3..b53bbb3f28 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.metadata.mtree.store;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.metadata.cache.MNodeNotCachedException;
import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
@@ -327,8 +326,7 @@ public class CachedMTreeStore implements IMTreeStore {
}
@Override
- public IEntityMNode setToEntity(IMNode node) throws
SchemaQuotaExceededException {
- quotaManager.checkDeviceLevel();
+ public IEntityMNode setToEntity(IMNode node) {
IEntityMNode result = MNodeUtils.setToEntity(node);
if (result != node) {
regionStatistics.addDevice();
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
index e15311927f..9f0d0fa5b9 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
@@ -75,7 +75,7 @@ public interface IMTreeStore {
void updateMNode(IMNode node) throws MetadataException;
- IEntityMNode setToEntity(IMNode node) throws MetadataException;
+ IEntityMNode setToEntity(IMNode node);
IMNode setToInternal(IEntityMNode entityMNode) throws MetadataException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
index fd5dedbddb..e08b4e4423 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
@@ -158,8 +157,7 @@ public class MemMTreeStore implements IMTreeStore {
public void updateMNode(IMNode node) {}
@Override
- public IEntityMNode setToEntity(IMNode node) throws
SchemaQuotaExceededException {
- schemaQuotaManager.checkDeviceLevel();
+ public IEntityMNode setToEntity(IMNode node) {
IEntityMNode result = MNodeUtils.setToEntity(node);
if (result != node) {
regionStatistics.addDevice();
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 9e9d8922f2..ba20c892fe 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.metadata.metric.ISchemaRegionMetric;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowNodesPlan;
@@ -127,6 +128,16 @@ public interface ISchemaRegion {
Map<Integer, MetadataException> checkMeasurementExistence(
PartialPath devicePath, List<String> measurementList, List<String>
aliasList);
+ /**
+ * Check whether time series can be created.
+ *
+ * @param devicePath the path of device that you want to check
+ * @param timeSeriesNum the number of time series that you want to check
+ * @throws SchemaQuotaExceededException if the number of time series or
devices exceeds the limit
+ */
+ void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+ throws SchemaQuotaExceededException;
+
/**
* Construct schema black list via setting matched timeseries to pre deleted.
*
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 43e758170e..6c18a976a2 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.idtable.IDTable;
@@ -521,8 +523,6 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
throw new SeriesOverflowException();
}
- schemaQuotaManager.checkMeasurementLevel(1);
-
try {
IMeasurementMNode leafMNode;
PartialPath path = plan.getPath();
@@ -590,8 +590,6 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
throw new SeriesOverflowException();
}
- schemaQuotaManager.checkMeasurementLevel(seriesCount);
-
try {
PartialPath prefixPath = plan.getDevicePath();
List<String> measurements = plan.getMeasurements();
@@ -678,6 +676,18 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
return mtree.checkMeasurementExistence(devicePath, measurementList,
aliasList);
}
+ @Override
+ public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+ throws SchemaQuotaExceededException {
+ if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.TIMESERIES)) {
+ schemaQuotaManager.checkMeasurementLevel(timeSeriesNum);
+ } else if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.DEVICE)) {
+ if (!mtree.checkDeviceNodeExists(devicePath)) {
+ schemaQuotaManager.checkDeviceLevel();
+ }
+ }
+ }
+
@Override
public long constructSchemaBlackList(PathPatternTree patternTree) throws
MetadataException {
long preDeletedNum = 0;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 79bf93e2d6..95fdbac528 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import
org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.idtable.IDTable;
@@ -585,8 +587,6 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
CacheMemoryManager.getInstance().waitIfReleasing();
}
- schemaQuotaManager.checkMeasurementLevel(1);
-
try {
PartialPath path = plan.getPath();
IMeasurementMNode leafMNode;
@@ -692,7 +692,6 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
while (!regionStatistics.isAllowToCreateNewSeries()) {
CacheMemoryManager.getInstance().waitIfReleasing();
}
- schemaQuotaManager.checkMeasurementLevel(seriesCount);
try {
PartialPath prefixPath = plan.getDevicePath();
@@ -790,6 +789,18 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
return mtree.checkMeasurementExistence(devicePath, measurementList,
aliasList);
}
+ @Override
+ public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+ throws SchemaQuotaExceededException {
+ if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.TIMESERIES)) {
+ schemaQuotaManager.checkMeasurementLevel(timeSeriesNum);
+ } else if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.DEVICE)) {
+ if (!mtree.checkDeviceNodeExists(devicePath)) {
+ schemaQuotaManager.checkDeviceLevel();
+ }
+ }
+ }
+
@Override
public long constructSchemaBlackList(PathPatternTree patternTree) throws
MetadataException {
long preDeletedNum = 0;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c035edabe5..7c89d72681 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
@@ -309,6 +310,11 @@ public class RegionWriteExecutor {
CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
ISchemaRegion schemaRegion =
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ RegionExecutionResult result =
+ checkQuotaBeforeCreatingTimeSeries(schemaRegion,
node.getPath().getDevicePath(), 1);
+ if (result != null) {
+ return result;
+ }
if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
@@ -322,7 +328,7 @@ public class RegionWriteExecutor {
} else {
MetadataException metadataException = failingMeasurementMap.get(0);
LOGGER.error("Metadata error: ", metadataException);
- RegionExecutionResult result = new RegionExecutionResult();
+ result = new RegionExecutionResult();
result.setAccepted(false);
result.setMessage(metadataException.getMessage());
result.setStatus(
@@ -343,6 +349,12 @@ public class RegionWriteExecutor {
CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
ISchemaRegion schemaRegion =
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ RegionExecutionResult result =
+ checkQuotaBeforeCreatingTimeSeries(
+ schemaRegion, node.getDevicePath(),
node.getMeasurements().size());
+ if (result != null) {
+ return result;
+ }
if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
@@ -354,7 +366,7 @@ public class RegionWriteExecutor {
} else {
MetadataException metadataException =
failingMeasurementMap.values().iterator().next();
LOGGER.error("Metadata error: ", metadataException);
- RegionExecutionResult result = new RegionExecutionResult();
+ result = new RegionExecutionResult();
result.setAccepted(false);
result.setMessage(metadataException.getMessage());
result.setStatus(
@@ -375,6 +387,16 @@ public class RegionWriteExecutor {
CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context)
{
ISchemaRegion schemaRegion =
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ RegionExecutionResult result;
+ for (Map.Entry<PartialPath, MeasurementGroup> entry :
+ node.getMeasurementGroupMap().entrySet()) {
+ result =
+ checkQuotaBeforeCreatingTimeSeries(
+ schemaRegion, entry.getKey(),
entry.getValue().getMeasurements().size());
+ if (result != null) {
+ return result;
+ }
+ }
if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
@@ -444,6 +466,12 @@ public class RegionWriteExecutor {
InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
ISchemaRegion schemaRegion =
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ RegionExecutionResult result =
+ checkQuotaBeforeCreatingTimeSeries(
+ schemaRegion, node.getDevicePath(),
node.getMeasurementGroup().size());
+ if (result != null) {
+ return result;
+ }
if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
@@ -495,6 +523,16 @@ public class RegionWriteExecutor {
InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
ISchemaRegion schemaRegion =
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ RegionExecutionResult result;
+ for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry
:
+ node.getDeviceMap().entrySet()) {
+ result =
+ checkQuotaBeforeCreatingTimeSeries(
+ schemaRegion, deviceEntry.getKey(),
deviceEntry.getValue().getRight().size());
+ if (result != null) {
+ return result;
+ }
+ }
if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
@@ -547,6 +585,25 @@ public class RegionWriteExecutor {
}
}
+ /**
+ * Check the quota before creating time series.
+ *
+ * @return null if the quota is not exceeded, otherwise return the
execution result.
+ */
+ private RegionExecutionResult checkQuotaBeforeCreatingTimeSeries(
+ ISchemaRegion schemaRegion, PartialPath path, int size) {
+ try {
+ schemaRegion.checkSchemaQuota(path, size);
+ } catch (SchemaQuotaExceededException e) {
+ RegionExecutionResult result = new RegionExecutionResult();
+ result.setAccepted(false);
+ result.setMessage(e.getMessage());
+ result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ return result;
+ }
+ return null;
+ }
+
private RegionExecutionResult processExecutionResultOfInternalCreateSchema(
RegionExecutionResult executionResult,
List<TSStatus> failingStatus,
@@ -617,7 +674,12 @@ public class RegionWriteExecutor {
result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
message));
return result;
}
- return super.visitActivateTemplate(node, context);
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
+ RegionExecutionResult result =
+ checkQuotaBeforeCreatingTimeSeries(
+ schemaRegion, node.getActivatePath(),
templateSetInfo.left.getMeasurementNumber());
+ return result == null ? super.visitActivateTemplate(node, context) :
result;
} finally {
context.getRegionWriteValidationRWLock().readLock().unlock();
}
@@ -629,6 +691,8 @@ public class RegionWriteExecutor {
// activate template operation shall be blocked by unset template check
context.getRegionWriteValidationRWLock().readLock().lock();
try {
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
for (PartialPath devicePath :
node.getTemplateActivationMap().keySet()) {
Pair<Template, PartialPath> templateSetInfo =
ClusterTemplateManager.getInstance().checkTemplateSetInfo(devicePath);
@@ -645,6 +709,12 @@ public class RegionWriteExecutor {
result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
message));
return result;
}
+ RegionExecutionResult result =
+ checkQuotaBeforeCreatingTimeSeries(
+ schemaRegion, devicePath,
templateSetInfo.left.getMeasurementNumber());
+ if (result != null) {
+ return result;
+ }
}
return super.visitBatchActivateTemplate(node, context);
@@ -659,6 +729,8 @@ public class RegionWriteExecutor {
// activate template operation shall be blocked by unset template check
context.getRegionWriteValidationRWLock().readLock().lock();
try {
+ ISchemaRegion schemaRegion =
+ SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
context.getRegionId());
for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
node.getTemplateActivationMap().entrySet()) {
Pair<Template, PartialPath> templateSetInfo =
@@ -678,6 +750,12 @@ public class RegionWriteExecutor {
result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
message));
return result;
}
+ RegionExecutionResult result =
+ checkQuotaBeforeCreatingTimeSeries(
+ schemaRegion, entry.getKey(),
templateSetInfo.left.getMeasurementNumber());
+ if (result != null) {
+ return result;
+ }
}
return super.visitInternalBatchActivateTemplate(node, context);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
index 40cd224e2f..bfbbe1c633 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
@@ -41,6 +41,7 @@ import java.util.Map;
public class InternalCreateMultiTimeSeriesNode extends WritePlanNode {
+ // <DevicePath, <IsAligned, MeasurementGroup>>
private Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap;
private TRegionReplicaSet regionReplicaSet;