This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 197890bba8 [To rel/1.1][IOTDB-5719] Move DataNode's checking of 
SchemaQuota above the consensus layer (#9626)
197890bba8 is described below

commit 197890bba8148e44ba829ffe3587b5461396e184
Author: Chen YZ <[email protected]>
AuthorDate: Wed Apr 19 10:57:32 2023 +0800

    [To rel/1.1][IOTDB-5719] Move DataNode's checking of SchemaQuota above the 
consensus layer (#9626)
---
 .../schemaregion/rocksdb/RSchemaRegion.java        |  5 ++
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |  5 ++
 .../iotdb/db/metadata/mtree/IMTreeBelowSG.java     |  8 +++
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  | 25 ++++++-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  | 21 +++++-
 .../db/metadata/mtree/store/CachedMTreeStore.java  |  4 +-
 .../iotdb/db/metadata/mtree/store/IMTreeStore.java |  2 +-
 .../db/metadata/mtree/store/MemMTreeStore.java     |  4 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    | 11 +++
 .../schemaregion/SchemaRegionMemoryImpl.java       | 18 +++--
 .../schemaregion/SchemaRegionSchemaFileImpl.java   | 17 ++++-
 .../execution/executor/RegionWriteExecutor.java    | 84 +++++++++++++++++++++-
 .../write/InternalCreateMultiTimeSeriesNode.java   |  1 +
 13 files changed, 182 insertions(+), 23 deletions(-)

diff --git 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 7a30ea527e..96abaa8c32 100644
--- 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -533,6 +533,11 @@ public class RSchemaRegion implements ISchemaRegion {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum) {
+    throw new UnsupportedOperationException();
+  }
+
   private void createEntityRecursively(String[] nodes, int start, int end, 
boolean aligned)
       throws RocksDBException, MetadataException, InterruptedException {
     if (start <= end) {
diff --git 
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
 
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 416ed5b547..e7eaca0516 100644
--- 
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ 
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -288,6 +288,11 @@ public class TagSchemaRegion implements ISchemaRegion {
     throw new UnsupportedOperationException("checkMeasurementExistence");
   }
 
+  @Override
+  public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum) {
+    throw new UnsupportedOperationException();
+  }
+
   private void filterExistingMeasurements(
       ICreateAlignedTimeSeriesPlan plan, Set<String> measurementSet) {
     List<String> measurements = plan.getMeasurements();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
index de0aa5b8e0..9f08141bb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
@@ -139,6 +139,14 @@ public interface IMTreeBelowSG {
    */
   IMNode getDeviceNodeWithAutoCreating(PartialPath deviceId) throws 
MetadataException;
 
+  /**
+   * Check if the device node exists
+   *
+   * @param deviceId full path of device
+   * @return true if the device node exists
+   */
+  boolean checkDeviceNodeExists(PartialPath deviceId);
+
   /**
    * Fetch all measurement path
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 1ce8393868..ae670621dc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -61,7 +61,6 @@ import 
org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
 import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
-import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -719,6 +718,28 @@ public class MTreeBelowSGCachedImpl implements 
IMTreeBelowSG {
       unPinPath(cur);
     }
   }
+
+  /**
+   * Check if the device node exists
+   *
+   * @param deviceId full path of device
+   * @return true if the device node exists
+   */
+  @Override
+  public boolean checkDeviceNodeExists(PartialPath deviceId) {
+    IMNode deviceMNode = null;
+    try {
+      deviceMNode = getNodeByPath(deviceId);
+      return deviceMNode.isEntity();
+    } catch (MetadataException e) {
+      return false;
+    } finally {
+      if (deviceMNode != null) {
+        unPinMNode(deviceMNode);
+      }
+    }
+  }
+
   // endregion
 
   // region Interfaces and Implementation for metadata info Query
@@ -841,8 +862,6 @@ public class MTreeBelowSGCachedImpl implements 
IMTreeBelowSG {
             throw new DifferentTemplateException(activatePath.getFullPath(), 
template.getName());
           }
         }
-        DataNodeSchemaQuotaManager.getInstance()
-            .checkMeasurementLevel(template.getMeasurementNumber());
 
         if (cur.isEntity()) {
           entityMNode = cur.getAsEntityMNode();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index f77debe0c9..26376b38f0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -60,7 +60,6 @@ import 
org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
-import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
@@ -601,6 +600,24 @@ public class MTreeBelowSGMemoryImpl implements 
IMTreeBelowSG {
     }
     return cur;
   }
+
+  /**
+   * Check if the device node exists
+   *
+   * @param deviceId full path of device
+   * @return true if the device node exists
+   */
+  @Override
+  public boolean checkDeviceNodeExists(PartialPath deviceId) {
+    IMNode deviceMNode = null;
+    try {
+      deviceMNode = getNodeByPath(deviceId);
+      return deviceMNode.isEntity();
+    } catch (MetadataException e) {
+      return false;
+    }
+  }
+
   // endregion
 
   // region Interfaces and Implementation for metadata info Query
@@ -711,8 +728,6 @@ public class MTreeBelowSGMemoryImpl implements 
IMTreeBelowSG {
           throw new DifferentTemplateException(activatePath.getFullPath(), 
template.getName());
         }
       }
-      DataNodeSchemaQuotaManager.getInstance()
-          .checkMeasurementLevel(template.getMeasurementNumber());
 
       if (cur.isEntity()) {
         entityMNode = cur.getAsEntityMNode();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
index 0a329b3cc3..b53bbb3f28 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.metadata.mtree.store;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.metadata.cache.MNodeNotCachedException;
 import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
 import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
@@ -327,8 +326,7 @@ public class CachedMTreeStore implements IMTreeStore {
   }
 
   @Override
-  public IEntityMNode setToEntity(IMNode node) throws 
SchemaQuotaExceededException {
-    quotaManager.checkDeviceLevel();
+  public IEntityMNode setToEntity(IMNode node) {
     IEntityMNode result = MNodeUtils.setToEntity(node);
     if (result != node) {
       regionStatistics.addDevice();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
index e15311927f..9f0d0fa5b9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
@@ -75,7 +75,7 @@ public interface IMTreeStore {
 
   void updateMNode(IMNode node) throws MetadataException;
 
-  IEntityMNode setToEntity(IMNode node) throws MetadataException;
+  IEntityMNode setToEntity(IMNode node);
 
   IMNode setToInternal(IEntityMNode entityMNode) throws MetadataException;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
index fd5dedbddb..e08b4e4423 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
 import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
@@ -158,8 +157,7 @@ public class MemMTreeStore implements IMTreeStore {
   public void updateMNode(IMNode node) {}
 
   @Override
-  public IEntityMNode setToEntity(IMNode node) throws 
SchemaQuotaExceededException {
-    schemaQuotaManager.checkDeviceLevel();
+  public IEntityMNode setToEntity(IMNode node) {
     IEntityMNode result = MNodeUtils.setToEntity(node);
     if (result != node) {
       regionStatistics.addDevice();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 9e9d8922f2..ba20c892fe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.metadata.metric.ISchemaRegionMetric;
 import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
 import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowNodesPlan;
@@ -127,6 +128,16 @@ public interface ISchemaRegion {
   Map<Integer, MetadataException> checkMeasurementExistence(
       PartialPath devicePath, List<String> measurementList, List<String> 
aliasList);
 
+  /**
+   * Check whether time series can be created.
+   *
+   * @param devicePath the path of device that you want to check
+   * @param timeSeriesNum the number of time series that you want to check
+   * @throws SchemaQuotaExceededException if the number of time series or 
devices exceeds the limit
+   */
+  void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+      throws SchemaQuotaExceededException;
+
   /**
    * Construct schema black list via setting matched timeseries to pre deleted.
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 43e758170e..6c18a976a2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
@@ -521,8 +523,6 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
       throw new SeriesOverflowException();
     }
 
-    schemaQuotaManager.checkMeasurementLevel(1);
-
     try {
       IMeasurementMNode leafMNode;
       PartialPath path = plan.getPath();
@@ -590,8 +590,6 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
       throw new SeriesOverflowException();
     }
 
-    schemaQuotaManager.checkMeasurementLevel(seriesCount);
-
     try {
       PartialPath prefixPath = plan.getDevicePath();
       List<String> measurements = plan.getMeasurements();
@@ -678,6 +676,18 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
     return mtree.checkMeasurementExistence(devicePath, measurementList, 
aliasList);
   }
 
+  @Override
+  public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+      throws SchemaQuotaExceededException {
+    if 
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.TIMESERIES)) {
+      schemaQuotaManager.checkMeasurementLevel(timeSeriesNum);
+    } else if 
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.DEVICE)) {
+      if (!mtree.checkDeviceNodeExists(devicePath)) {
+        schemaQuotaManager.checkDeviceLevel();
+      }
+    }
+  }
+
   @Override
   public long constructSchemaBlackList(PathPatternTree patternTree) throws 
MetadataException {
     long preDeletedNum = 0;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 79bf93e2d6..95fdbac528 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import 
org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
@@ -585,8 +587,6 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
       CacheMemoryManager.getInstance().waitIfReleasing();
     }
 
-    schemaQuotaManager.checkMeasurementLevel(1);
-
     try {
       PartialPath path = plan.getPath();
       IMeasurementMNode leafMNode;
@@ -692,7 +692,6 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
     while (!regionStatistics.isAllowToCreateNewSeries()) {
       CacheMemoryManager.getInstance().waitIfReleasing();
     }
-    schemaQuotaManager.checkMeasurementLevel(seriesCount);
 
     try {
       PartialPath prefixPath = plan.getDevicePath();
@@ -790,6 +789,18 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
     return mtree.checkMeasurementExistence(devicePath, measurementList, 
aliasList);
   }
 
+  @Override
+  public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
+      throws SchemaQuotaExceededException {
+    if 
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.TIMESERIES)) {
+      schemaQuotaManager.checkMeasurementLevel(timeSeriesNum);
+    } else if 
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.DEVICE)) {
+      if (!mtree.checkDeviceNodeExists(devicePath)) {
+        schemaQuotaManager.checkDeviceLevel();
+      }
+    }
+  }
+
   @Override
   public long constructSchemaBlackList(PathPatternTree patternTree) throws 
MetadataException {
     long preDeletedNum = 0;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c035edabe5..7c89d72681 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
@@ -309,6 +310,11 @@ public class RegionWriteExecutor {
         CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
       ISchemaRegion schemaRegion =
           SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      RegionExecutionResult result =
+          checkQuotaBeforeCreatingTimeSeries(schemaRegion, 
node.getPath().getDevicePath(), 1);
+      if (result != null) {
+        return result;
+      }
       if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
@@ -322,7 +328,7 @@ public class RegionWriteExecutor {
           } else {
             MetadataException metadataException = failingMeasurementMap.get(0);
             LOGGER.error("Metadata error: ", metadataException);
-            RegionExecutionResult result = new RegionExecutionResult();
+            result = new RegionExecutionResult();
             result.setAccepted(false);
             result.setMessage(metadataException.getMessage());
             result.setStatus(
@@ -343,6 +349,12 @@ public class RegionWriteExecutor {
         CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
       ISchemaRegion schemaRegion =
           SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      RegionExecutionResult result =
+          checkQuotaBeforeCreatingTimeSeries(
+              schemaRegion, node.getDevicePath(), 
node.getMeasurements().size());
+      if (result != null) {
+        return result;
+      }
       if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
@@ -354,7 +366,7 @@ public class RegionWriteExecutor {
           } else {
             MetadataException metadataException = 
failingMeasurementMap.values().iterator().next();
             LOGGER.error("Metadata error: ", metadataException);
-            RegionExecutionResult result = new RegionExecutionResult();
+            result = new RegionExecutionResult();
             result.setAccepted(false);
             result.setMessage(metadataException.getMessage());
             result.setStatus(
@@ -375,6 +387,16 @@ public class RegionWriteExecutor {
         CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) 
{
       ISchemaRegion schemaRegion =
           SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      RegionExecutionResult result;
+      for (Map.Entry<PartialPath, MeasurementGroup> entry :
+          node.getMeasurementGroupMap().entrySet()) {
+        result =
+            checkQuotaBeforeCreatingTimeSeries(
+                schemaRegion, entry.getKey(), 
entry.getValue().getMeasurements().size());
+        if (result != null) {
+          return result;
+        }
+      }
       if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
@@ -444,6 +466,12 @@ public class RegionWriteExecutor {
         InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
       ISchemaRegion schemaRegion =
           SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      RegionExecutionResult result =
+          checkQuotaBeforeCreatingTimeSeries(
+              schemaRegion, node.getDevicePath(), 
node.getMeasurementGroup().size());
+      if (result != null) {
+        return result;
+      }
       if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
@@ -495,6 +523,16 @@ public class RegionWriteExecutor {
         InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
       ISchemaRegion schemaRegion =
           SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+      RegionExecutionResult result;
+      for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry 
:
+          node.getDeviceMap().entrySet()) {
+        result =
+            checkQuotaBeforeCreatingTimeSeries(
+                schemaRegion, deviceEntry.getKey(), 
deviceEntry.getValue().getRight().size());
+        if (result != null) {
+          return result;
+        }
+      }
       if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
@@ -547,6 +585,25 @@ public class RegionWriteExecutor {
       }
     }
 
+    /**
+     * Check the quota before creating time series.
+     *
+     * @return null if the quota is not exceeded, otherwise return the 
execution result.
+     */
+    private RegionExecutionResult checkQuotaBeforeCreatingTimeSeries(
+        ISchemaRegion schemaRegion, PartialPath path, int size) {
+      try {
+        schemaRegion.checkSchemaQuota(path, size);
+      } catch (SchemaQuotaExceededException e) {
+        RegionExecutionResult result = new RegionExecutionResult();
+        result.setAccepted(false);
+        result.setMessage(e.getMessage());
+        result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+        return result;
+      }
+      return null;
+    }
+
     private RegionExecutionResult processExecutionResultOfInternalCreateSchema(
         RegionExecutionResult executionResult,
         List<TSStatus> failingStatus,
@@ -617,7 +674,12 @@ public class RegionWriteExecutor {
           result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
           return result;
         }
-        return super.visitActivateTemplate(node, context);
+        ISchemaRegion schemaRegion =
+            SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
+        RegionExecutionResult result =
+            checkQuotaBeforeCreatingTimeSeries(
+                schemaRegion, node.getActivatePath(), 
templateSetInfo.left.getMeasurementNumber());
+        return result == null ? super.visitActivateTemplate(node, context) : 
result;
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
@@ -629,6 +691,8 @@ public class RegionWriteExecutor {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
+        ISchemaRegion schemaRegion =
+            SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
         for (PartialPath devicePath : 
node.getTemplateActivationMap().keySet()) {
           Pair<Template, PartialPath> templateSetInfo =
               
ClusterTemplateManager.getInstance().checkTemplateSetInfo(devicePath);
@@ -645,6 +709,12 @@ public class RegionWriteExecutor {
             result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
             return result;
           }
+          RegionExecutionResult result =
+              checkQuotaBeforeCreatingTimeSeries(
+                  schemaRegion, devicePath, 
templateSetInfo.left.getMeasurementNumber());
+          if (result != null) {
+            return result;
+          }
         }
 
         return super.visitBatchActivateTemplate(node, context);
@@ -659,6 +729,8 @@ public class RegionWriteExecutor {
       // activate template operation shall be blocked by unset template check
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
+        ISchemaRegion schemaRegion =
+            SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) 
context.getRegionId());
         for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
             node.getTemplateActivationMap().entrySet()) {
           Pair<Template, PartialPath> templateSetInfo =
@@ -678,6 +750,12 @@ public class RegionWriteExecutor {
             result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
             return result;
           }
+          RegionExecutionResult result =
+              checkQuotaBeforeCreatingTimeSeries(
+                  schemaRegion, entry.getKey(), 
templateSetInfo.left.getMeasurementNumber());
+          if (result != null) {
+            return result;
+          }
         }
 
         return super.visitInternalBatchActivateTemplate(node, context);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
index 40cd224e2f..bfbbe1c633 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
@@ -41,6 +41,7 @@ import java.util.Map;
 
 public class InternalCreateMultiTimeSeriesNode extends WritePlanNode {
 
+  // <DevicePath, <IsAligned, MeasurementGroup>>
   private Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap;
 
   private TRegionReplicaSet regionReplicaSet;

Reply via email to