This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3d9cd3c3052 Refactor Series/Device Quota Limitation (#11422)
3d9cd3c3052 is described below
commit 3d9cd3c305283b12586011ebbcd5ceeb9dff3dea
Author: Chen YZ <[email protected]>
AuthorDate: Wed Nov 1 17:32:13 2023 +0800
Refactor Series/Device Quota Limitation (#11422)
---
.../it/env/cluster/config/MppCommonConfig.java | 8 +-
.../env/cluster/config/MppSharedCommonConfig.java | 12 +--
.../it/env/remote/config/RemoteCommonConfig.java | 6 +-
.../org/apache/iotdb/itbase/env/CommonConfig.java | 4 +-
.../{ => quota}/IoTDBClusterDeviceQuotaIT.java | 7 +-
.../IoTDBClusterMixQuotaIT.java} | 77 ++++++++------------
.../IoTDBClusterQuotaIT.java} | 39 +++-------
.../IoTDBClusterTimeSeriesQuotaIT.java} | 10 +--
.../heartbeat/DataNodeHeartbeatHandler.java | 31 +++-----
.../iotdb/confignode/manager/ConfigManager.java | 6 +-
.../manager/load/service/HeartbeatService.java | 9 ++-
.../manager/schema/ClusterSchemaManager.java | 31 +++++++-
.../schema/ClusterSchemaQuotaStatistics.java | 73 ++++++++++++++++---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 26 -------
.../metadata/SchemaQuotaExceededException.java | 8 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 6 +-
.../config/executor/ClusterConfigTaskExecutor.java | 18 +++++
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 66 ++++++++---------
.../rescon/DataNodeSchemaQuotaManager.java | 85 +++++++++++++---------
.../rescon/ISchemaEngineStatistics.java | 2 +
.../rescon/MemSchemaEngineStatistics.java | 14 +++-
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 11 +--
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 11 +--
.../template/ClusterTemplateManager.java | 4 +
.../resources/conf/iotdb-common.properties | 20 +++--
.../apache/iotdb/commons/conf/CommonConfig.java | 20 +++++
.../iotdb/commons/conf/CommonDescriptor.java | 10 +++
.../src/main/thrift/datanode.thrift | 9 ++-
29 files changed, 357 insertions(+), 288 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 02f89b74652..32c2539776d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -350,14 +350,14 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
}
@Override
- public CommonConfig setClusterSchemaLimitLevel(String
clusterSchemaLimitLevel) {
- setProperty("cluster_schema_limit_level", clusterSchemaLimitLevel);
+ public CommonConfig setClusterTimeseriesLimitThreshold(long
clusterSchemaLimitThreshold) {
+ setProperty("cluster_timeseries_limit_threshold",
String.valueOf(clusterSchemaLimitThreshold));
return this;
}
@Override
- public CommonConfig setClusterSchemaLimitThreshold(long
clusterSchemaLimitThreshold) {
- setProperty("cluster_schema_limit_threshold",
String.valueOf(clusterSchemaLimitThreshold));
+ public CommonConfig setClusterDeviceLimitThreshold(long
clusterDeviceLimitThreshold) {
+ setProperty("cluster_device_limit_threshold",
String.valueOf(clusterDeviceLimitThreshold));
return this;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 27d973311e5..2d54c4dbfdb 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -352,16 +352,16 @@ public class MppSharedCommonConfig implements
CommonConfig {
}
@Override
- public CommonConfig setClusterSchemaLimitLevel(String
clusterSchemaLimitLevel) {
- dnConfig.setClusterSchemaLimitLevel(clusterSchemaLimitLevel);
- cnConfig.setClusterSchemaLimitLevel(clusterSchemaLimitLevel);
+ public CommonConfig setClusterTimeseriesLimitThreshold(long
clusterSchemaLimitThreshold) {
+ dnConfig.setClusterTimeseriesLimitThreshold(clusterSchemaLimitThreshold);
+ cnConfig.setClusterTimeseriesLimitThreshold(clusterSchemaLimitThreshold);
return this;
}
@Override
- public CommonConfig setClusterSchemaLimitThreshold(long
clusterSchemaLimitThreshold) {
- dnConfig.setClusterSchemaLimitThreshold(clusterSchemaLimitThreshold);
- cnConfig.setClusterSchemaLimitThreshold(clusterSchemaLimitThreshold);
+ public CommonConfig setClusterDeviceLimitThreshold(long
clusterDeviceLimitThreshold) {
+ dnConfig.setClusterDeviceLimitThreshold(clusterDeviceLimitThreshold);
+ cnConfig.setClusterDeviceLimitThreshold(clusterDeviceLimitThreshold);
return this;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 81d1bb8d1f2..64cc2eccae5 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -253,13 +253,13 @@ public class RemoteCommonConfig implements CommonConfig {
}
@Override
- public CommonConfig setClusterSchemaLimitLevel(String
clusterSchemaLimitLevel) {
+ public CommonConfig setClusterTimeseriesLimitThreshold(long
clusterSchemaLimitThreshold) {
return this;
}
@Override
- public CommonConfig setClusterSchemaLimitThreshold(long
clusterSchemaLimitThreshold) {
- return this;
+ public CommonConfig setClusterDeviceLimitThreshold(long
clusterDeviceLimitThreshold) {
+ return null;
}
@Override
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 61ab834539e..ef2c1678981 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -108,9 +108,9 @@ public interface CommonConfig {
CommonConfig setWriteMemoryProportion(String writeMemoryProportion);
- CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel);
+ CommonConfig setClusterTimeseriesLimitThreshold(long
clusterTimeseriesLimitThreshold);
- CommonConfig setClusterSchemaLimitThreshold(long
clusterSchemaLimitThreshold);
+ CommonConfig setClusterDeviceLimitThreshold(long
clusterDeviceLimitThreshold);
CommonConfig setDatabaseLimitThreshold(long databaseLimitThreshold);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterDeviceQuotaIT.java
similarity index 85%
copy from
integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
copy to
integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterDeviceQuotaIT.java
index d2eca143a2f..0d62082f367 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterDeviceQuotaIT.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it.schema;
+package org.apache.iotdb.db.it.schema.quota;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.util.AbstractSchemaIT;
import org.junit.runners.Parameterized;
-public class IoTDBClusterDeviceQuotaIT extends IoTDBClusterMeasurementQuotaIT {
+public class IoTDBClusterDeviceQuotaIT extends IoTDBClusterQuotaIT {
public IoTDBClusterDeviceQuotaIT(AbstractSchemaIT.SchemaTestMode
schemaTestMode) {
super(schemaTestMode);
}
@@ -31,8 +31,7 @@ public class IoTDBClusterDeviceQuotaIT extends
IoTDBClusterMeasurementQuotaIT {
@Parameterized.BeforeParam
public static void before() throws Exception {
setUpEnvironment();
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("device");
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitThreshold(3);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterDeviceLimitThreshold(3);
EnvFactory.getEnv().initClusterEnvironment();
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterMixQuotaIT.java
similarity index 57%
copy from
integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
copy to
integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterMixQuotaIT.java
index 42ab75671d5..e910918e1c4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterMixQuotaIT.java
@@ -16,30 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it.schema;
+
+package org.apache.iotdb.db.it.schema.quota;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.util.AbstractSchemaIT;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;
import java.sql.Connection;
-import java.sql.SQLException;
import java.sql.Statement;
-public class IoTDBClusterMeasurementQuotaIT extends AbstractSchemaIT {
- public IoTDBClusterMeasurementQuotaIT(SchemaTestMode schemaTestMode) {
+public class IoTDBClusterMixQuotaIT extends IoTDBClusterQuotaIT {
+ public IoTDBClusterMixQuotaIT(AbstractSchemaIT.SchemaTestMode
schemaTestMode) {
super(schemaTestMode);
}
@Parameterized.BeforeParam
public static void before() throws Exception {
setUpEnvironment();
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("timeseries");
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitThreshold(6);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterDeviceLimitThreshold(3);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterTimeseriesLimitThreshold(6);
EnvFactory.getEnv().initClusterEnvironment();
}
@@ -49,37 +48,8 @@ public class IoTDBClusterMeasurementQuotaIT extends
AbstractSchemaIT {
tearDownEnvironment();
}
- @Before
- public void setUp() throws Exception {
- prepareTimeSeries();
- Thread.sleep(2000); // wait heartbeat
- }
-
- /** Prepare time series: 2 databases, 3 devices, 6 time series */
- private void prepareTimeSeries() throws SQLException {
- try (Connection connection = EnvFactory.getEnv().getConnection();
- Statement statement = connection.createStatement()) {
- // create database
- statement.execute("CREATE DATABASE root.sg1");
- statement.execute("CREATE DATABASE root.sg2");
- // create device template
- statement.execute("CREATE DEVICE TEMPLATE t1 (s1 INT64, s2 DOUBLE)");
- // set device template
- statement.execute("SET DEVICE TEMPLATE t1 TO root.sg2");
- statement.execute(
- "create timeseries root.sg1.d0.s0 with datatype=FLOAT, encoding=RLE,
compression=SNAPPY");
- statement.execute(
- "create timeseries root.sg1.d0.s1 with datatype=FLOAT, encoding=RLE,
compression=SNAPPY");
- statement.execute(
- "create timeseries root.sg1.d1.s0 with datatype=FLOAT, encoding=RLE,
compression=SNAPPY");
- statement.execute(
- "create timeseries root.sg1.d1.s1 with datatype=FLOAT, encoding=RLE,
compression=SNAPPY");
- statement.execute("create timeseries of device template on
root.sg2.d1;");
- }
- }
-
@Test
- public void testClusterSchemaQuota() {
+ public void testClusterSchemaQuota2() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try {
@@ -87,23 +57,19 @@ public class IoTDBClusterMeasurementQuotaIT extends
AbstractSchemaIT {
"create timeseries root.sg1.d3.s0 with datatype=FLOAT,
encoding=RLE, compression=SNAPPY");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(
- e.getMessage()
- .contains("The current metadata capacity has exceeded the
cluster quota"));
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
}
try {
statement.execute("create timeseries of device template on
root.sg2.d2;");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(
- e.getMessage()
- .contains("The current metadata capacity has exceeded the
cluster quota"));
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
}
// delete some timeseries and database
statement.execute("delete database root.sg2;");
statement.execute("delete timeseries root.sg1.d0.s0;");
Thread.sleep(2000); // wait heartbeat
- // now we can create 3 new timeseries or 1 new device
+ // now we can create 3 new timeseries and 1 new device
statement.execute("SET DEVICE TEMPLATE t1 TO root.sg1.d4");
statement.execute("create timeseries of device template on root.sg1.d4");
statement.execute(
@@ -114,9 +80,26 @@ public class IoTDBClusterMeasurementQuotaIT extends
AbstractSchemaIT {
"create timeseries root.sg1.d3.s0 with datatype=FLOAT,
encoding=RLE, compression=SNAPPY");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(
- e.getMessage()
- .contains("The current metadata capacity has exceeded the
cluster quota"));
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
+ }
+ try {
+ statement.execute(
+ "create timeseries root.sg1.d1.s4 with datatype=FLOAT,
encoding=RLE, compression=SNAPPY");
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
+ }
+ try {
+ statement.execute("insert into root.sg1.d1(timestamp,s4)
values(1,1.0)");
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
+ }
+ try {
+ statement.execute("insert into root.sg1.d4(timestamp,s4)
values(1,1.0)");
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
}
} catch (Exception e) {
e.printStackTrace();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterQuotaIT.java
similarity index 76%
rename from
integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterQuotaIT.java
index 42ab75671d5..b772bd796a6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterQuotaIT.java
@@ -16,45 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it.schema;
+package org.apache.iotdb.db.it.schema.quota;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.util.AbstractSchemaIT;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-public class IoTDBClusterMeasurementQuotaIT extends AbstractSchemaIT {
- public IoTDBClusterMeasurementQuotaIT(SchemaTestMode schemaTestMode) {
+public abstract class IoTDBClusterQuotaIT extends AbstractSchemaIT {
+ public IoTDBClusterQuotaIT(SchemaTestMode schemaTestMode) {
super(schemaTestMode);
}
- @Parameterized.BeforeParam
- public static void before() throws Exception {
- setUpEnvironment();
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("timeseries");
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitThreshold(6);
- EnvFactory.getEnv().initClusterEnvironment();
- }
-
- @Parameterized.AfterParam
- public static void after() throws Exception {
- EnvFactory.getEnv().cleanClusterEnvironment();
- tearDownEnvironment();
- }
-
@Before
public void setUp() throws Exception {
prepareTimeSeries();
Thread.sleep(2000); // wait heartbeat
}
+ @After
+ public void tearDown() throws Exception {
+ clearSchema();
+ }
+
/** Prepare time series: 2 databases, 3 devices, 6 time series */
private void prepareTimeSeries() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
@@ -87,17 +78,13 @@ public class IoTDBClusterMeasurementQuotaIT extends
AbstractSchemaIT {
"create timeseries root.sg1.d3.s0 with datatype=FLOAT,
encoding=RLE, compression=SNAPPY");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(
- e.getMessage()
- .contains("The current metadata capacity has exceeded the
cluster quota"));
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
}
try {
statement.execute("create timeseries of device template on
root.sg2.d2;");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(
- e.getMessage()
- .contains("The current metadata capacity has exceeded the
cluster quota"));
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
}
// delete some timeseries and database
statement.execute("delete database root.sg2;");
@@ -114,9 +101,7 @@ public class IoTDBClusterMeasurementQuotaIT extends
AbstractSchemaIT {
"create timeseries root.sg1.d3.s0 with datatype=FLOAT,
encoding=RLE, compression=SNAPPY");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(
- e.getMessage()
- .contains("The current metadata capacity has exceeded the
cluster quota"));
+ Assert.assertTrue(e.getMessage().contains("capacity has exceeded the
cluster quota"));
}
} catch (Exception e) {
e.printStackTrace();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterTimeSeriesQuotaIT.java
similarity index 80%
rename from
integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterTimeSeriesQuotaIT.java
index d2eca143a2f..ce86ba56c4d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/quota/IoTDBClusterTimeSeriesQuotaIT.java
@@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it.schema;
+
+package org.apache.iotdb.db.it.schema.quota;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.util.AbstractSchemaIT;
import org.junit.runners.Parameterized;
-public class IoTDBClusterDeviceQuotaIT extends IoTDBClusterMeasurementQuotaIT {
- public IoTDBClusterDeviceQuotaIT(AbstractSchemaIT.SchemaTestMode
schemaTestMode) {
+public class IoTDBClusterTimeSeriesQuotaIT extends IoTDBClusterQuotaIT {
+ public IoTDBClusterTimeSeriesQuotaIT(AbstractSchemaIT.SchemaTestMode
schemaTestMode) {
super(schemaTestMode);
}
@Parameterized.BeforeParam
public static void before() throws Exception {
setUpEnvironment();
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("device");
-
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitThreshold(3);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setClusterTimeseriesLimitThreshold(6);
EnvFactory.getEnv().initClusterEnvironment();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 33ae5ca3835..90ce2c347e4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -45,7 +45,8 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
private final Map<Integer, Long> timeSeriesNum;
private final Map<Integer, Long> regionDisk;
- private final Consumer<Map<Integer, Long>> schemaQuotaRespProcess;
+ private final Consumer<Map<Integer, Long>> seriesUsageRespProcess;
+ private final Consumer<Map<Integer, Long>> deviceUsageRespProcess;
private final PipeRuntimeCoordinator pipeRuntimeCoordinator;
@@ -55,7 +56,8 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
Map<Integer, Long> deviceNum,
Map<Integer, Long> timeSeriesNum,
Map<Integer, Long> regionDisk,
- Consumer<Map<Integer, Long>> schemaQuotaRespProcess,
+ Consumer<Map<Integer, Long>> seriesUsageRespProcess,
+ Consumer<Map<Integer, Long>> deviceUsageRespProcess,
PipeRuntimeCoordinator pipeRuntimeCoordinator) {
this.nodeId = nodeId;
@@ -63,7 +65,8 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
this.deviceNum = deviceNum;
this.timeSeriesNum = timeSeriesNum;
this.regionDisk = regionDisk;
- this.schemaQuotaRespProcess = schemaQuotaRespProcess;
+ this.seriesUsageRespProcess = seriesUsageRespProcess;
+ this.deviceUsageRespProcess = deviceUsageRespProcess;
this.pipeRuntimeCoordinator = pipeRuntimeCoordinator;
}
@@ -96,27 +99,17 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
}
});
- if (heartbeatResp.getRegionDeviceNumMap() != null) {
- deviceNum.putAll(heartbeatResp.getRegionDeviceNumMap());
+ if (heartbeatResp.getRegionDeviceUsageMap() != null) {
+ deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap());
+ deviceUsageRespProcess.accept(heartbeatResp.getRegionDeviceUsageMap());
}
- if (heartbeatResp.getRegionTimeSeriesNumMap() != null) {
- timeSeriesNum.putAll(heartbeatResp.getRegionTimeSeriesNumMap());
+ if (heartbeatResp.getRegionSeriesUsageMap() != null) {
+ timeSeriesNum.putAll(heartbeatResp.getRegionSeriesUsageMap());
+ seriesUsageRespProcess.accept(heartbeatResp.getRegionSeriesUsageMap());
}
if (heartbeatResp.getRegionDisk() != null) {
regionDisk.putAll(heartbeatResp.getRegionDisk());
}
- if (heartbeatResp.getSchemaLimitLevel() != null) {
- switch (heartbeatResp.getSchemaLimitLevel()) {
- case DEVICE:
- schemaQuotaRespProcess.accept(heartbeatResp.getRegionDeviceNumMap());
- break;
- case TIMESERIES:
-
schemaQuotaRespProcess.accept(heartbeatResp.getRegionTimeSeriesNumMap());
- break;
- default:
- break;
- }
- }
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(nodeId,
heartbeatResp.getPipeMetaList());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ba26a7e951a..94d93bc3124 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -275,7 +275,11 @@ public class ConfigManager implements IManager {
// Build the manager module
this.nodeManager = new NodeManager(this, nodeInfo);
this.clusterSchemaManager =
- new ClusterSchemaManager(this, clusterSchemaInfo, new
ClusterSchemaQuotaStatistics());
+ new ClusterSchemaManager(
+ this,
+ clusterSchemaInfo,
+ new ClusterSchemaQuotaStatistics(
+ COMMON_CONF.getSeriesLimitThreshold(),
COMMON_CONF.getDeviceLimitThreshold()));
this.partitionManager = new PartitionManager(this, partitionInfo);
this.permissionManager = new PermissionManager(this, authorInfo);
this.procedureManager = new ProcedureManager(this, procedureInfo);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 9e8f7fdabaf..284a4d85bbd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import
org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,7 +127,10 @@ public class HeartbeatService {
heartbeatReq.setNeedJudgeLeader(true);
// We sample DataNode's load in every 10 heartbeat loop
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
-
heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount());
+ Pair<Long, Long> schemaQuotaRemain =
+ configManager.getClusterSchemaManager().getSchemaQuotaRemain();
+ heartbeatReq.setTimeSeriesQuotaRemain(schemaQuotaRemain.left);
+ heartbeatReq.setDeviceQuotaRemain(schemaQuotaRemain.right);
// We collect pipe meta in every 100 heartbeat loop
heartbeatReq.setNeedPipeMetaList(
!PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled()
@@ -186,7 +190,8 @@ public class HeartbeatService {
configManager.getClusterQuotaManager().getDeviceNum(),
configManager.getClusterQuotaManager().getTimeSeriesNum(),
configManager.getClusterQuotaManager().getRegionDisk(),
- configManager.getClusterSchemaManager()::updateSchemaQuota,
+ configManager.getClusterSchemaManager()::updateTimeSeriesUsage,
+ configManager.getClusterSchemaManager()::updateDeviceUsage,
configManager.getPipeManager().getPipeRuntimeCoordinator());
configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
AsyncDataNodeHeartbeatClientPool.getInstance()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 96a11483f70..6a2bd48a2f7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -1077,12 +1077,35 @@ public class ClusterSchemaManager {
}
}
- public long getSchemaQuotaCount() {
- return
schemaQuotaStatistics.getSchemaQuotaCount(getPartitionManager().getAllSchemaPartition());
+ /**
+ * Only leader use this interface. Get the remain schema quota of specified
schema region.
+ *
+ * @return pair of <series quota, device quota>, -1 means no limit
+ */
+ public Pair<Long, Long> getSchemaQuotaRemain() {
+ boolean isDeviceLimit = schemaQuotaStatistics.getDeviceThreshold() != -1;
+ boolean isSeriesLimit = schemaQuotaStatistics.getSeriesThreshold() != -1;
+ if (isSeriesLimit || isDeviceLimit) {
+ Set<Integer> schemaPartitionSet =
getPartitionManager().getAllSchemaPartition();
+ return new Pair<>(
+ isSeriesLimit ?
schemaQuotaStatistics.getSeriesQuotaRemain(schemaPartitionSet) : -1L,
+ isDeviceLimit ?
schemaQuotaStatistics.getDeviceQuotaRemain(schemaPartitionSet) : -1L);
+ } else {
+ return new Pair<>(-1L, -1L);
+ }
+ }
+
+ public void updateTimeSeriesUsage(Map<Integer, Long> seriesUsage) {
+ schemaQuotaStatistics.updateTimeSeriesUsage(seriesUsage);
+ }
+
+ public void updateDeviceUsage(Map<Integer, Long> deviceUsage) {
+ schemaQuotaStatistics.updateDeviceUsage(deviceUsage);
}
- public void updateSchemaQuota(Map<Integer, Long> schemaCountMap) {
- schemaQuotaStatistics.updateCount(schemaCountMap);
+ public void updateSchemaQuotaConfiguration(long seriesThreshold, long
deviceThreshold) {
+ schemaQuotaStatistics.setDeviceThreshold(deviceThreshold);
+ schemaQuotaStatistics.setSeriesThreshold(seriesThreshold);
}
public void clearSchemaQuotaCache() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
index 935ca04bb57..8d87d5b2715 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
@@ -27,20 +27,75 @@ import java.util.concurrent.ConcurrentHashMap;
public class ClusterSchemaQuotaStatistics {
// TODO: it can be merged with statistics in ClusterQuotaManager
- private final Map<Integer, Long> countMap = new ConcurrentHashMap<>();
+ private long seriesThreshold;
+ private long deviceThreshold;
- public void updateCount(@NotNull Map<Integer, Long> schemaCountMap) {
- countMap.putAll(schemaCountMap);
+ private final Map<Integer, Long> seriesCountMap = new ConcurrentHashMap<>();
+ private final Map<Integer, Long> deviceCountMap = new ConcurrentHashMap<>();
+
+ public ClusterSchemaQuotaStatistics(long seriesThreshold, long
deviceThreshold) {
+ this.seriesThreshold = seriesThreshold;
+ this.deviceThreshold = deviceThreshold;
+ }
+
+ public void updateTimeSeriesUsage(@NotNull Map<Integer, Long> seriesUsage) {
+ seriesCountMap.putAll(seriesUsage);
+ }
+
+ public void updateDeviceUsage(@NotNull Map<Integer, Long> deviceUsage) {
+ deviceCountMap.putAll(deviceUsage);
+ }
+
+ /**
+ * Get the remain quota of series and device for the given consensus group.
+ *
+ * @param consensusGroupIdSet the consensus group id set
+ * @return the remain quota, >=0
+ */
+ public long getSeriesQuotaRemain(Set<Integer> consensusGroupIdSet) {
+ long res =
+ seriesThreshold
+ - seriesCountMap.entrySet().stream()
+ .filter(i -> consensusGroupIdSet.contains(i.getKey()))
+ .mapToLong(Map.Entry::getValue)
+ .sum();
+ return res > 0 ? res : 0;
+ }
+
+ /**
+ * Get the remain quota of device for the given consensus group.
+ *
+ * @param consensusGroupIdSet the consensus group id set
+ * @return the remain quota, >=0
+ */
+ public long getDeviceQuotaRemain(Set<Integer> consensusGroupIdSet) {
+ long res =
+ deviceThreshold
+ - deviceCountMap.entrySet().stream()
+ .filter(i -> consensusGroupIdSet.contains(i.getKey()))
+ .mapToLong(Map.Entry::getValue)
+ .sum();
+ return res > 0 ? res : 0;
+ }
+
+ public long getSeriesThreshold() {
+ return seriesThreshold;
+ }
+
+ public void setSeriesThreshold(long seriesThreshold) {
+ this.seriesThreshold = seriesThreshold;
+ }
+
+ public long getDeviceThreshold() {
+ return deviceThreshold;
}
- public long getSchemaQuotaCount(Set<Integer> consensusGroupIdSet) {
- return countMap.entrySet().stream()
- .filter(i -> consensusGroupIdSet.contains(i.getKey()))
- .mapToLong(Map.Entry::getValue)
- .sum();
+ public void setDeviceThreshold(long deviceThreshold) {
+ this.deviceThreshold = deviceThreshold;
}
public void clear() {
- countMap.clear();
+ seriesCountMap.clear();
+ deviceCountMap.clear();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 2e0a06e912e..f0ec678e894 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1057,12 +1057,6 @@ public class IoTDBConfig {
/** whether to enable the audit log * */
private boolean enableAuditLog = false;
- /** This configuration parameter sets the level at which the time series
limit is applied.* */
- private String clusterSchemaLimitLevel = "timeseries";
-
- /** This configuration parameter sets the maximum number of schema allowed
in the cluster.* */
- private long clusterSchemaLimitThreshold = -1;
-
/** Output location of audit logs * */
private List<AuditLogStorage> auditLogStorage =
Arrays.asList(AuditLogStorage.IOTDB, AuditLogStorage.LOGGER);
@@ -3763,22 +3757,6 @@ public class IoTDBConfig {
return sortTmpDir;
}
- public String getClusterSchemaLimitLevel() {
- return clusterSchemaLimitLevel;
- }
-
- public void setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
- this.clusterSchemaLimitLevel = clusterSchemaLimitLevel;
- }
-
- public long getClusterSchemaLimitThreshold() {
- return clusterSchemaLimitThreshold;
- }
-
- public void setClusterSchemaLimitThreshold(long clusterSchemaLimitThreshold)
{
- this.clusterSchemaLimitThreshold = clusterSchemaLimitThreshold;
- }
-
public String getObjectStorageBucket() {
throw new UnsupportedOperationException("object storage is not supported
yet");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e172782d929..7f3247b7c71 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.schemaengine.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
@@ -226,17 +225,6 @@ public class IoTDBDescriptor {
}
public void loadProperties(Properties properties) throws
BadNodeUrlException, IOException {
- conf.setClusterSchemaLimitLevel(
- properties
- .getProperty("cluster_schema_limit_level",
conf.getClusterSchemaLimitLevel())
- .trim());
- conf.setClusterSchemaLimitThreshold(
- Long.parseLong(
- properties
- .getProperty(
- "cluster_schema_limit_threshold",
- Long.toString(conf.getClusterSchemaLimitThreshold()))
- .trim()));
conf.setClusterName(
properties.getProperty(IoTDBConstant.CLUSTER_NAME,
conf.getClusterName()).trim());
@@ -1635,20 +1623,6 @@ public class IoTDBDescriptor {
// update compaction config
loadCompactionHotModifiedProps(properties);
-
- // update schema quota configuration
- conf.setClusterSchemaLimitLevel(
- properties
- .getProperty("cluster_schema_limit_level",
conf.getClusterSchemaLimitLevel())
- .trim());
- conf.setClusterSchemaLimitThreshold(
- Long.parseLong(
- properties
- .getProperty(
- "cluster_schema_limit_threshold",
- Long.toString(conf.getClusterSchemaLimitThreshold()))
- .trim()));
- DataNodeSchemaQuotaManager.getInstance().updateConfiguration();
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload
configuration because %s", e));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
index bd890602817..52d401bcc66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
@@ -26,14 +26,12 @@ import org.apache.iotdb.rpc.TSStatusCode;
public class SchemaQuotaExceededException extends MetadataException {
// used for timeseries/device limit
- public SchemaQuotaExceededException(ClusterSchemaQuotaLevel level, long
limit) {
+ public SchemaQuotaExceededException(ClusterSchemaQuotaLevel level) {
super(
String.format(
"The current metadata capacity has exceeded the cluster quota. "
- + "The cluster quota is set at the %s level, with a limit
number of %d. "
- + "Please review your configuration "
- + "or delete some existing time series to comply with the
quota.",
- level.toString(), limit),
+ + "Please review your configuration on ConfigNode or delete
some existing %s to comply with the quota.",
+ level),
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index c24c4000075..1f49bae4ecd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1162,9 +1162,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
if (req.getSchemaRegionIds() != null) {
spaceQuotaManager.updateSpaceQuotaUsage(req.getSpaceQuotaUsage());
- resp.setRegionDeviceNumMap(
+ resp.setRegionDeviceUsageMap(
schemaEngine.countDeviceNumBySchemaRegion(req.getSchemaRegionIds()));
- resp.setRegionTimeSeriesNumMap(
+ resp.setRegionSeriesUsageMap(
schemaEngine.countTimeSeriesNumBySchemaRegion(req.getSchemaRegionIds()));
}
if (req.getDataRegionIds() != null) {
@@ -1172,7 +1172,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
resp.setRegionDisk(spaceQuotaManager.getRegionDisk());
}
// Update schema quota if necessary
-
SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount,
resp);
+ SchemaEngine.getInstance().updateAndFillSchemaCountMap(req, resp);
// Update pipe meta if necessary
if (req.isNeedPipeMetaList()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index c3803213c99..5ad9b4ea985 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -176,6 +177,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetSpaceQuotaSta
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.SetThrottleQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowThrottleQuotaStatement;
+import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.schemaengine.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
@@ -1425,6 +1428,21 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
duplicateMeasurement)));
return future;
}
+ // check schema quota
+ long localNeedQuota =
+ (long) templateExtendInfo.getMeasurements().size()
+ * SchemaEngine.getInstance()
+ .getSchemaEngineStatistics()
+
.getTemplateUsingNumber(templateExtendInfo.getTemplateName());
+ if (localNeedQuota != 0) {
+ try {
+
+ DataNodeSchemaQuotaManager.getInstance().check(localNeedQuota, 0);
+ } catch (SchemaQuotaExceededException e) {
+ future.setException(e);
+ return future;
+ }
+ }
}
TAlterSchemaTemplateReq req = new TAlterSchemaTemplateReq();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index f335304f5c0..25e60a7b61e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -43,8 +43,8 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegionParams;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionLoader;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionParams;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSchemaLimitLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -374,45 +374,39 @@ public class SchemaEngine {
* not -1 and deviceNumMap/timeSeriesNumMap is null, fill
deviceNumMap/timeSeriesNumMap of the
* SchemaRegion whose current node is the leader
*
- * @param totalCount cluster schema usage
+ * @param req heartbeat request
* @param resp heartbeat response
*/
- public void updateAndFillSchemaCountMap(long totalCount, THeartbeatResp
resp) {
+ public void updateAndFillSchemaCountMap(THeartbeatReq req, THeartbeatResp
resp) {
// update DataNodeSchemaQuotaManager
- schemaQuotaManager.updateRemain(totalCount);
- if (schemaQuotaManager.getLimit() < 0) {
- return;
+ schemaQuotaManager.updateRemain(
+ req.getTimeSeriesQuotaRemain(),
+ req.isSetDeviceQuotaRemain() ? req.getDeviceQuotaRemain() : -1);
+ if (schemaQuotaManager.isDeviceLimit()) {
+ if (resp.getRegionDeviceUsageMap() == null) {
+ resp.setRegionDeviceUsageMap(new HashMap<>());
+ }
+ Map<Integer, Long> tmp = resp.getRegionDeviceUsageMap();
+ schemaRegionMap.values().stream()
+ .filter(i ->
SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+ .forEach(
+ i ->
+ tmp.put(
+ i.getSchemaRegionId().getId(),
+ i.getSchemaRegionStatistics().getDevicesNumber()));
}
- Map<Integer, Long> res = new HashMap<>();
- switch (schemaQuotaManager.getLevel()) {
- case TIMESERIES:
- if (resp.getRegionTimeSeriesNumMap() == null) {
- schemaRegionMap.values().stream()
- .filter(i ->
SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
- .forEach(
- i ->
- res.put(
- i.getSchemaRegionId().getId(),
- i.getSchemaRegionStatistics().getSeriesNumber()));
- resp.setRegionTimeSeriesNumMap(res);
- resp.setSchemaLimitLevel(TSchemaLimitLevel.TIMESERIES);
- }
- break;
- case DEVICE:
- if (resp.getRegionDeviceNumMap() == null) {
- schemaRegionMap.values().stream()
- .filter(i ->
SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
- .forEach(
- i ->
- res.put(
- i.getSchemaRegionId().getId(),
- i.getSchemaRegionStatistics().getDevicesNumber()));
- resp.setRegionDeviceNumMap(res);
- resp.setSchemaLimitLevel(TSchemaLimitLevel.DEVICE);
- }
- break;
- default:
- throw new UnsupportedOperationException();
+ if (schemaQuotaManager.isSeriesLimit()) {
+ if (resp.getRegionSeriesUsageMap() == null) {
+ resp.setRegionSeriesUsageMap(new HashMap<>());
+ }
+ Map<Integer, Long> tmp = resp.getRegionSeriesUsageMap();
+ schemaRegionMap.values().stream()
+ .filter(i ->
SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+ .forEach(
+ i ->
+ tmp.put(
+ i.getSchemaRegionId().getId(),
+ i.getSchemaRegionStatistics().getSeriesNumber()));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/DataNodeSchemaQuotaManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/DataNodeSchemaQuotaManager.java
index 73cb74b6c20..3f9ac072138 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/DataNodeSchemaQuotaManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/DataNodeSchemaQuotaManager.java
@@ -20,62 +20,81 @@
package org.apache.iotdb.db.schemaengine.rescon;
import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import java.util.concurrent.atomic.AtomicLong;
@SuppressWarnings("java:S6548") // do not warn about singleton class
public class DataNodeSchemaQuotaManager {
+ private boolean seriesLimit = false;
+ private boolean deviceLimit = false;
+ private final AtomicLong seriesRemain = new AtomicLong(0);
+ private final AtomicLong deviceRemain = new AtomicLong(0);
- private ClusterSchemaQuotaLevel level =
- ClusterSchemaQuotaLevel.valueOf(
-
IoTDBDescriptor.getInstance().getConfig().getClusterSchemaLimitLevel().toUpperCase());
- private long limit =
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getClusterSchemaLimitThreshold(); // -1 means no limitation
- private final AtomicLong remain = new AtomicLong(0);
-
- public void updateRemain(long totalCount) {
- this.remain.getAndSet(limit - totalCount);
+ /**
+ * Update remain quota.
+ *
+ * @param seriesRemain -1 means no limit, otherwise it is the remain series
quota
+ * @param deviceRemain -1 means no limit, otherwise it is the remain device
quota
+ */
+ public void updateRemain(long seriesRemain, long deviceRemain) {
+ if (seriesRemain == -1) {
+ this.seriesLimit = false;
+ } else {
+ this.seriesLimit = true;
+ this.seriesRemain.set(seriesRemain);
+ }
+ if (deviceRemain == -1) {
+ this.deviceLimit = false;
+ } else {
+ this.deviceLimit = true;
+ this.deviceRemain.set(deviceRemain);
+ }
}
- public void checkMeasurementLevel(int acquireNumber) throws
SchemaQuotaExceededException {
- if (limit > 0 && level.equals(ClusterSchemaQuotaLevel.TIMESERIES)) {
- if (remain.get() <= 0) {
- throw new SchemaQuotaExceededException(level, limit);
+ private void checkMeasurementLevel(long acquireNumber) throws
SchemaQuotaExceededException {
+ if (seriesLimit) {
+ if (seriesRemain.get() <= 0) {
+ throw new
SchemaQuotaExceededException(ClusterSchemaQuotaLevel.TIMESERIES);
} else {
- remain.addAndGet(-acquireNumber);
+ seriesRemain.addAndGet(-acquireNumber);
}
}
}
- public void checkDeviceLevel() throws SchemaQuotaExceededException {
- if (limit > 0 && level.equals(ClusterSchemaQuotaLevel.DEVICE)) {
- if (remain.get() <= 0) {
- throw new SchemaQuotaExceededException(level, limit);
+ private void checkDeviceLevel() throws SchemaQuotaExceededException {
+ if (deviceLimit) {
+ if (deviceRemain.get() <= 0) {
+ throw new SchemaQuotaExceededException(ClusterSchemaQuotaLevel.DEVICE);
} else {
- remain.addAndGet(-1L);
+ deviceRemain.addAndGet(-1L);
}
}
}
- public void updateConfiguration() {
- this.level =
- ClusterSchemaQuotaLevel.valueOf(
-
IoTDBDescriptor.getInstance().getConfig().getClusterSchemaLimitLevel().toUpperCase());
- long oldLimit = limit;
- this.limit =
IoTDBDescriptor.getInstance().getConfig().getClusterSchemaLimitThreshold();
- this.remain.addAndGet(limit - oldLimit);
+ public void check(long acquireSeriesNumber, int acquireDeviceNumber)
+ throws SchemaQuotaExceededException {
+ if (acquireDeviceNumber > 0) {
+ checkDeviceLevel();
+ }
+ // if pass device check, check measurement level
+ try {
+ checkMeasurementLevel(acquireSeriesNumber);
+ } catch (SchemaQuotaExceededException e) {
+ // if measurement level check failed, roll back device remain
+ if (acquireDeviceNumber > 0) {
+ deviceRemain.addAndGet(1L);
+ }
+ throw e;
+ }
}
- public ClusterSchemaQuotaLevel getLevel() {
- return level;
+ public boolean isSeriesLimit() {
+ return seriesLimit;
}
- public long getLimit() {
- return limit;
+ public boolean isDeviceLimit() {
+ return deviceLimit;
}
private DataNodeSchemaQuotaManager() {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaEngineStatistics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaEngineStatistics.java
index 2d68512914f..1cdd6aa11b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaEngineStatistics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaEngineStatistics.java
@@ -35,6 +35,8 @@ public interface ISchemaEngineStatistics {
long getTemplateSeriesNumber();
+ int getTemplateUsingNumber(String templateName);
+
MemSchemaEngineStatistics getAsMemSchemaEngineStatistics();
CachedSchemaEngineStatistics getAsCachedSchemaEngineStatistics();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
index a774159da23..e4c2e8d375c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java
@@ -39,6 +39,9 @@ public class MemSchemaEngineStatistics implements
ISchemaEngineStatistics {
private final long memoryCapacity =
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion();
+ private final ClusterTemplateManager clusterTemplateManager =
+ ClusterTemplateManager.getInstance();
+
protected final AtomicLong memoryUsage = new AtomicLong(0);
private final AtomicLong totalSeriesNumber = new AtomicLong(0);
@@ -123,7 +126,6 @@ public class MemSchemaEngineStatistics implements
ISchemaEngineStatistics {
@Override
public long getTemplateSeriesNumber() {
- ClusterTemplateManager clusterTemplateManager =
ClusterTemplateManager.getInstance();
return templateUsage.entrySet().stream()
.mapToLong(
i ->
@@ -132,6 +134,16 @@ public class MemSchemaEngineStatistics implements
ISchemaEngineStatistics {
.sum();
}
+ @Override
+ public int getTemplateUsingNumber(String templateName) {
+ Integer templateId = clusterTemplateManager.getTemplateId(templateName);
+ if (templateId == null) {
+ return 0;
+ } else {
+ return templateUsage.getOrDefault(templateId, 0);
+ }
+ }
+
public void activateTemplate(int templateId) {
templateUsage.compute(templateId, (k, v) -> (v == null) ? 1 : v + 1);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 0d221f3fab8..a2e457c0a7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
@@ -683,12 +682,10 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
@Override
public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
throws SchemaQuotaExceededException {
- if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.TIMESERIES)) {
- schemaQuotaManager.checkMeasurementLevel(timeSeriesNum);
- } else if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.DEVICE)) {
- if (!mtree.checkDeviceNodeExists(devicePath)) {
- schemaQuotaManager.checkDeviceLevel();
- }
+ if (!mtree.checkDeviceNodeExists(devicePath)) {
+ schemaQuotaManager.check(timeSeriesNum, 1);
+ } else {
+ schemaQuotaManager.check(timeSeriesNum, 0);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 1ddb29cee64..26a1887582c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
@@ -762,12 +761,10 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
@Override
public void checkSchemaQuota(PartialPath devicePath, int timeSeriesNum)
throws SchemaQuotaExceededException {
- if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.TIMESERIES)) {
- schemaQuotaManager.checkMeasurementLevel(timeSeriesNum);
- } else if
(schemaQuotaManager.getLevel().equals(ClusterSchemaQuotaLevel.DEVICE)) {
- if (!mtree.checkDeviceNodeExists(devicePath)) {
- schemaQuotaManager.checkDeviceLevel();
- }
+ if (!mtree.checkDeviceNodeExists(devicePath)) {
+ schemaQuotaManager.check(timeSeriesNum, 1);
+ } else {
+ schemaQuotaManager.check(timeSeriesNum, 0);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/ClusterTemplateManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/ClusterTemplateManager.java
index 6d37c78634b..f5b5afda68c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/ClusterTemplateManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/ClusterTemplateManager.java
@@ -641,6 +641,10 @@ public class ClusterTemplateManager implements
ITemplateManager {
}
}
+ public Integer getTemplateId(String templateName) {
+ return templateNameMap.get(templateName);
+ }
+
@TestOnly
public void putTemplate(Template template) {
templateIdMap.put(template.getId(), template);
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 079cd39f907..d6089f16dea 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -280,19 +280,17 @@ cluster_name=defaultCluster
# Support FIFO and LRU policy. FIFO takes low cache update overhead. LRU takes
high cache hit rate.
# datanode_schema_cache_eviction_policy=FIFO
-# This configuration parameter sets the level at which the time series limit
is applied.
-# There are two available levels: 'device' and 'timeseries'.
-# 'device': The limit is applied to the number of devices in the cluster.
-# 'timeseries': The limit is applied to the number of timeseries in the
cluster.
-# Set the value to either 'device' or 'timeseries' based on your desired
control level.
-# cluster_schema_limit_level=timeseries
-
-# This configuration parameter sets the maximum number of schema allowed in
the cluster.
+# This configuration parameter sets the maximum number of time series allowed
in the cluster.
# The value should be a positive integer representing the desired threshold.
# When the threshold is reached, users will be prohibited from creating new
time series.
-# Set the value based on the desired maximum number of schema for your IoTDB
cluster.
-# -1 means the system does not impose a limit on the maximum number of time
series.
-# cluster_schema_limit_threshold=-1
+# -1 means unlimited
+# cluster_timeseries_limit_threshold=-1
+
+# This configuration parameter sets the maximum number of device allowed in
the cluster.
+# The value should be a positive integer representing the desired threshold.
+# When the threshold is reached, users will be prohibited from creating new
time series.
+# -1 means unlimited
+# cluster_device_limit_threshold=-1
# This configuration parameter sets the maximum number of Cluster Databases
allowed.
# The value should be a positive integer representing the desired threshold.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index df94ca21013..e4abbc10f89 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -205,6 +205,10 @@ public class CommonConfig {
private long datanodeTokenTimeoutMS = 180 * 1000; // 3 minutes
+ // timeseries and device limit
+ private long seriesLimitThreshold = -1;
+ private long deviceLimitThreshold = -1;
+
CommonConfig() {
// Empty constructor
}
@@ -815,4 +819,20 @@ public class CommonConfig {
public void setDatanodeTokenTimeoutMS(long timeoutMS) {
this.datanodeTokenTimeoutMS = timeoutMS;
}
+
+ public long getSeriesLimitThreshold() {
+ return seriesLimitThreshold;
+ }
+
+ public void setSeriesLimitThreshold(long seriesLimitThreshold) {
+ this.seriesLimitThreshold = seriesLimitThreshold;
+ }
+
+ public long getDeviceLimitThreshold() {
+ return deviceLimitThreshold;
+ }
+
+ public void setDeviceLimitThreshold(long deviceLimitThreshold) {
+ this.deviceLimitThreshold = deviceLimitThreshold;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index bc254f832b1..cc794954a78 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -230,6 +230,16 @@ public class CommonDescriptor {
Integer.parseInt(
properties.getProperty(
"database_limit_threshold",
String.valueOf(config.getDatabaseLimitThreshold()))));
+ config.setSeriesLimitThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "cluster_timeseries_limit_threshold",
+ String.valueOf(config.getSeriesLimitThreshold()))));
+ config.setDeviceLimitThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "cluster_device_limit_threshold",
+ String.valueOf(config.getDeviceLimitThreshold()))));
}
private void loadPipeProps(Properties properties) {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 11a322fe2c0..d1502b7f401 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -244,11 +244,12 @@ struct THeartbeatReq {
1: required i64 heartbeatTimestamp
2: required bool needJudgeLeader
3: required bool needSamplingLoad
- 4: required i64 schemaQuotaCount
+ 4: required i64 timeSeriesQuotaRemain
5: optional list<i32> schemaRegionIds
6: optional list<i32> dataRegionIds
7: optional map<string, common.TSpaceQuota> spaceQuotaUsage
8: optional bool needPipeMetaList
+ 9: optional i64 deviceQuotaRemain
}
struct THeartbeatResp {
@@ -257,10 +258,10 @@ struct THeartbeatResp {
3: optional string statusReason
4: optional map<common.TConsensusGroupId, bool> judgedLeaders
5: optional TLoadSample loadSample
- 6: optional map<i32, i64> regionDeviceNumMap
- 7: optional map<i32, i64> regionTimeSeriesNumMap
+ 6: optional map<i32, i64> regionSeriesUsageMap
+ 7: optional map<i32, i64> regionDeviceUsageMap
8: optional map<i32, i64> regionDisk
- // TODO: schemaLimitLevel can be removed if confignode support hot load
configuration
+ // TODO: schemaLimitLevel is not used from 1.3.0, keep it for compatibility
9: optional TSchemaLimitLevel schemaLimitLevel
10: optional list<binary> pipeMetaList
}