This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5fd2c9cd45c Implement schema quota for activation in table model
5fd2c9cd45c is described below
commit 5fd2c9cd45cba16c7365706e4e5bb3cbb1f39a2c
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 18 19:44:32 2024 +0800
Implement schema quota for activation in table model
---
.../manager/schema/ClusterSchemaManager.java | 7 +++--
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 33 ++++++++++++++++++----
.../rescon/ISchemaRegionStatistics.java | 6 +++-
.../rescon/MemSchemaRegionStatistics.java | 6 ++++
.../apache/iotdb/commons/schema/table/TsTable.java | 15 ++++++++++
5 files changed, 58 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 5ff65c4de9f..46f71275080 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -1139,15 +1139,16 @@ public class ClusterSchemaManager {
}
}
- public void updateTimeSeriesUsage(Map<Integer, Long> seriesUsage) {
+ public void updateTimeSeriesUsage(final Map<Integer, Long> seriesUsage) {
schemaQuotaStatistics.updateTimeSeriesUsage(seriesUsage);
}
- public void updateDeviceUsage(Map<Integer, Long> deviceUsage) {
+ public void updateDeviceUsage(final Map<Integer, Long> deviceUsage) {
schemaQuotaStatistics.updateDeviceUsage(deviceUsage);
}
- public void updateSchemaQuotaConfiguration(long seriesThreshold, long
deviceThreshold) {
+ public void updateSchemaQuotaConfiguration(
+ final long seriesThreshold, final long deviceThreshold) {
schemaQuotaStatistics.setDeviceThreshold(deviceThreshold);
schemaQuotaStatistics.setSeriesThreshold(seriesThreshold);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index 8dc1c338a94..76958eaf1d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -43,6 +45,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegionParams;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionLoader;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionParams;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
@@ -56,6 +59,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -359,8 +363,8 @@ public class SchemaEngine {
return schemaRegionMap == null ? 0 : schemaRegionMap.size();
}
- public Map<Integer, Long> countDeviceNumBySchemaRegion(List<Integer>
schemaIds) {
- Map<Integer, Long> deviceNum = new HashMap<>();
+ public Map<Integer, Long> countDeviceNumBySchemaRegion(final List<Integer>
schemaIds) {
+ final Map<Integer, Long> deviceNum = new HashMap<>();
schemaRegionMap.entrySet().stream()
.filter(
@@ -375,8 +379,8 @@ public class SchemaEngine {
return deviceNum;
}
- public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(List<Integer>
schemaIds) {
- Map<Integer, Long> timeSeriesNum = new HashMap<>();
+ public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(final
List<Integer> schemaIds) {
+ final Map<Integer, Long> timeSeriesNum = new HashMap<>();
schemaRegionMap.entrySet().stream()
.filter(
entry ->
@@ -386,7 +390,26 @@ public class SchemaEngine {
entry ->
timeSeriesNum.put(
entry.getKey().getId(),
-
entry.getValue().getSchemaRegionStatistics().getSeriesNumber(false)));
+
entry.getValue().getSchemaRegionStatistics().getSeriesNumber(false)
+ + entry
+ .getValue()
+ .getSchemaRegionStatistics()
+ .getTable2DevicesNumMap()
+ .entrySet()
+ .stream()
+ .map(
+ tableEntry -> {
+ final TsTable table =
+ DataNodeTableCache.getInstance()
+ .getTable(
+ PathUtils.unQualifyDatabaseName(
+
entry.getValue().getDatabaseFullPath()),
+ tableEntry.getKey());
+ return Objects.nonNull(table)
+ ? table.getMeasurementNum() *
tableEntry.getValue()
+ : 0;
+ })
+ .reduce(0L, Long::sum)));
return timeSeriesNum;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaRegionStatistics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaRegionStatistics.java
index 188b5c72770..77442589500 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaRegionStatistics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/ISchemaRegionStatistics.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.schemaengine.rescon;
+import java.util.Map;
+
public interface ISchemaRegionStatistics {
boolean isAllowToCreateNewSeries();
@@ -27,10 +29,12 @@ public interface ISchemaRegionStatistics {
int getSchemaRegionId();
- long getSeriesNumber(boolean includeView);
+ long getSeriesNumber(final boolean includeView);
long getDevicesNumber();
+ Map<String, Long> getTable2DevicesNumMap();
+
long getTableDevicesNumber(final String table);
int getTemplateActivatedNumber();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaRegionStatistics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaRegionStatistics.java
index 042b06cadca..ad29546a78b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaRegionStatistics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaRegionStatistics.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.schemaengine.rescon;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -97,6 +98,11 @@ public class MemSchemaRegionStatistics implements
ISchemaRegionStatistics {
return devicesNumber.get();
}
+ @Override
+ public Map<String, Long> getTable2DevicesNumMap() {
+ return tableDeviceNumber;
+ }
+
@Override
public long getTableDevicesNumber(final String table) {
final Long deviceNumber = tableDeviceNumber.get(table);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index 3829420ca58..014f92a0c59 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -77,6 +77,7 @@ public class TsTable {
private Map<String, String> props = null;
private transient int idNums = 0;
+ private transient int measurementNum = 0;
public TsTable(final String tableName) {
this.tableName = tableName;
@@ -112,6 +113,8 @@ public class TsTable {
if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.ID)) {
idNums++;
idColumnIndexMap.put(columnSchema.getColumnName(), idNums - 1);
+ } else if
(columnSchema.getColumnCategory().equals(TsTableColumnCategory.MEASUREMENT)) {
+ measurementNum++;
}
} finally {
readWriteLock.writeLock().unlock();
@@ -140,6 +143,9 @@ public class TsTable {
throw new SchemaExecutionException("Cannot remove an id column: " +
columnName);
} else if (columnSchema != null) {
columnSchemaMap.remove(columnName);
+ if
(columnSchema.getColumnCategory().equals(TsTableColumnCategory.MEASUREMENT)) {
+ measurementNum--;
+ }
}
} finally {
readWriteLock.writeLock().unlock();
@@ -164,6 +170,15 @@ public class TsTable {
}
}
+ public int getMeasurementNum() {
+ readWriteLock.readLock().lock();
+ try {
+ return measurementNum;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
public List<TsTableColumnSchema> getColumnList() {
readWriteLock.readLock().lock();
try {