This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fc62a59a40 [IOTDB-2958]create a simple schemaCache for datanode
metadata (#5602)
fc62a59a40 is described below
commit fc62a59a401dce2043e36b62aa0be056632a84ee
Author: Yifu Zhou <[email protected]>
AuthorDate: Mon Apr 25 22:57:02 2022 +0800
[IOTDB-2958]create a simple schemaCache for datanode metadata (#5602)
---
.../resources/conf/iotdb-engine.properties | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +
.../db/metadata/cache/DataNodeSchemaCache.java | 186 +++++++++++++++++++++
.../iotdb/db/metadata/cache/SchemaCacheEntity.java | 114 +++++++++++++
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 95 +++++++++++
6 files changed, 420 insertions(+)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 46bda55e3a..ac905a0f53 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -1007,6 +1007,11 @@ timestamp_precision=ms
# Datatype: int
# schema_region_device_node_cache_size=10000
+# cache size for DataNode.
+# This cache is used to improve insert speed where each datanode has its
metadata cache and can do path consistency check locally.
+# Datatype: int
+# datanode_schema_cache_size=10000
+
####################
### Schema File Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d1bdc6d3b9..527dd4e42b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -890,6 +890,12 @@ public class IoTDBConfig {
? Runtime.getRuntime().availableProcessors() / 4
: 1;
+ /**
+ * Cache size of dataNodeSchemaCache in{@link
+ * org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache}.
+ */
+ private int dataNodeSchemaCacheSize = 10000;
+
public float getUdfMemoryBudgetInMB() {
return udfMemoryBudgetInMB;
}
@@ -2815,4 +2821,12 @@ public class IoTDBConfig {
public void setClusterMode(boolean isClusterMode) {
this.isClusterMode = isClusterMode;
}
+
+ public int getDataNodeSchemaCacheSize() {
+ return dataNodeSchemaCacheSize;
+ }
+
+ public void setDataNodeSchemaCacheSize(int dataNodeSchemaCacheSize) {
+ this.dataNodeSchemaCacheSize = dataNodeSchemaCacheSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d318b2c112..bbfd20b6c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -826,6 +826,12 @@ public class IoTDBDescriptor {
"iotdb_server_encrypt_decrypt_provider_parameter",
conf.getEncryptDecryptProviderParameter()));
+ conf.setDataNodeSchemaCacheSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "datanode_schema_cache_size",
+ String.valueOf(conf.getDataNodeSchemaCacheSize()))));
+
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance()
.getConfig()
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
new file mode 100644
index 0000000000..cc130ff120
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class takes the responsibility of metadata cache management of all
DataRegions under
+ * StorageEngine
+ */
+public class DataNodeSchemaCache {
+ private static final Logger logger =
LoggerFactory.getLogger(DataNodeSchemaCache.class);
+
+ private static IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+
+ private Cache<PartialPath, SchemaCacheEntity> schemaEntityCache;
+
+ // TODO use fakeSchemaFetcherImpl for test temporarily
+ private static final ISchemaFetcher schemaFetcher = new
FakeSchemaFetcherImpl();
+
+ private DataNodeSchemaCache() {
+ schemaEntityCache =
+
Caffeine.newBuilder().maximumSize(config.getDataNodeSchemaCacheSize()).build();
+ }
+
+ public static DataNodeSchemaCache getInstance() {
+ return DataNodeSchemaCache.DataNodeSchemaEntryCacheHolder.INSTANCE;
+ }
+
+ /** singleton pattern. */
+ private static class DataNodeSchemaEntryCacheHolder {
+ private static final DataNodeSchemaCache INSTANCE = new
DataNodeSchemaCache();
+ }
+
+ /**
+ * Get SchemaEntity info without auto create schema
+ *
+ * @param devicePath should not be measurementPath or AlignedPath
+ * @param measurements
+ * @return timeseries partialPath and its SchemaEntity
+ */
+ public Map<PartialPath, SchemaCacheEntity> getSchemaEntity(
+ PartialPath devicePath, String[] measurements) {
+ Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
+ SchemaCacheEntity schemaCacheEntity;
+ List<String> fetchMeasurements = new ArrayList<>();
+ for (String measurement : measurements) {
+ PartialPath path = null;
+ try {
+ path = new PartialPath(devicePath.getFullPath(), measurement);
+ } catch (IllegalPathException e) {
+ logger.error(
+ "Create PartialPath:{} failed.",
+ devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR +
measurement);
+ }
+ schemaCacheEntity = schemaEntityCache.getIfPresent(path);
+ if (schemaCacheEntity != null) {
+ schemaCacheEntityMap.put(path, schemaCacheEntity);
+ } else {
+ fetchMeasurements.add(measurement);
+ }
+ }
+ if (fetchMeasurements.size() != 0) {
+ SchemaTree schemaTree;
+ schemaTree = schemaFetcher.fetchSchema(new PathPatternTree(devicePath,
fetchMeasurements));
+ // TODO need to construct schemaEntry from schemaTree
+
+ }
+ return schemaCacheEntityMap;
+ }
+
+ /**
+ * Get SchemaEntity info with auto create schema
+ *
+ * @param devicePath
+ * @param measurements
+ * @param tsDataTypes
+ * @param isAligned
+ * @return timeseries partialPath and its SchemaEntity
+ */
+ public Map<PartialPath, SchemaCacheEntity> getSchemaEntityWithAutoCreate(
+ PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes,
boolean isAligned) {
+ Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
+ SchemaCacheEntity schemaCacheEntity;
+ List<String> fetchMeasurements = new ArrayList<>();
+ List<TSDataType> fetchTsDataTypes = new ArrayList<>();
+ for (int i = 0; i < measurements.length; i++) {
+ PartialPath path = null;
+ try {
+ path = new PartialPath(devicePath.getFullPath(), measurements[i]);
+ } catch (IllegalPathException e) {
+ logger.error(
+ "Create PartialPath:{} failed.",
+ devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR +
measurements[i]);
+ }
+ schemaCacheEntity = schemaEntityCache.getIfPresent(path);
+ if (schemaCacheEntity != null) {
+ schemaCacheEntityMap.put(path, schemaCacheEntity);
+ } else {
+ fetchMeasurements.add(measurements[i]);
+ fetchTsDataTypes.add(tsDataTypes[i]);
+ }
+ }
+ if (fetchMeasurements.size() != 0) {
+ SchemaTree schemaTree;
+ schemaTree =
+ schemaFetcher.fetchSchemaWithAutoCreate(
+ devicePath,
+ fetchMeasurements.toArray(new String[fetchMeasurements.size()]),
+ fetchTsDataTypes.toArray(new
TSDataType[fetchTsDataTypes.size()]),
+ isAligned);
+ // TODO need to construct schemaEntry from schemaTree
+
+ for (int i = 0; i < fetchMeasurements.size(); i++) {
+ try {
+ PartialPath path = new PartialPath(devicePath.getFullPath(),
fetchMeasurements.get(i));
+ SchemaCacheEntity entity =
+ new SchemaCacheEntity(fetchMeasurements.get(i),
fetchTsDataTypes.get(i), isAligned);
+ schemaEntityCache.put(path, entity);
+ schemaCacheEntityMap.put(path, entity);
+ } catch (IllegalPathException e) {
+ logger.error("Create PartialPath:{} failed.",
devicePath.getFullPath());
+ }
+ }
+ }
+ return schemaCacheEntityMap;
+ }
+
+ /**
+ * For delete timeseries meatadata cache operation
+ *
+ * @param partialPath
+ * @return
+ */
+ public void invalidate(PartialPath partialPath) {
+ schemaEntityCache.invalidate(partialPath);
+ }
+
+ @TestOnly
+ public void cleanUp() {
+ schemaEntityCache.invalidateAll();
+ schemaEntityCache.cleanUp();
+ }
+
+ @TestOnly
+ protected Cache<PartialPath, SchemaCacheEntity> getSchemaEntityCache() {
+ return schemaEntityCache;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
new file mode 100644
index 0000000000..0e07b56e3f
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+public class SchemaCacheEntity {
+ private final String schemaEntryId;
+
+ private TSDataType tsDataType;
+
+ private TSEncoding tsEncoding;
+
+ private CompressionType compressionType;
+
+ private String alias;
+
+ private boolean isAligned;
+
+ @TestOnly
+ public SchemaCacheEntity() {
+ this.schemaEntryId = "1";
+ }
+
+ public SchemaCacheEntity(String schemaEntryId, TSDataType tsDataType,
boolean isAligned) {
+ this.schemaEntryId = schemaEntryId;
+ this.tsDataType = tsDataType;
+ this.isAligned = isAligned;
+ this.tsEncoding =
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder());
+ this.compressionType =
TSFileDescriptor.getInstance().getConfig().getCompressor();
+ this.alias = "";
+ }
+
+ public SchemaCacheEntity(
+ String schemaEntryId,
+ TSDataType tsDataType,
+ TSEncoding tsEncoding,
+ CompressionType compressionType,
+ String alias,
+ boolean isAligned) {
+ this.schemaEntryId = schemaEntryId;
+ this.tsDataType = tsDataType;
+ this.tsEncoding = tsEncoding;
+ this.compressionType = compressionType;
+ this.alias = alias;
+ this.isAligned = isAligned;
+ }
+
+ public String getSchemaEntryId() {
+ return schemaEntryId;
+ }
+
+ public TSDataType getTsDataType() {
+ return tsDataType;
+ }
+
+ public void setTsDataType(TSDataType tsDataType) {
+ this.tsDataType = tsDataType;
+ }
+
+ public TSEncoding getTsEncoding() {
+ return tsEncoding;
+ }
+
+ public void setTsEncoding(TSEncoding tsEncoding) {
+ this.tsEncoding = tsEncoding;
+ }
+
+ public CompressionType getCompressionType() {
+ return compressionType;
+ }
+
+ public void setCompressionType(CompressionType compressionType) {
+ this.compressionType = compressionType;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+
+ public void setAligned(boolean aligned) {
+ isAligned = aligned;
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
new file mode 100644
index 0000000000..7a1cf1f32c
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class DataNodeSchemaCacheTest {
+ DataNodeSchemaCache dataNodeSchemaCache;
+
+ @Before
+ public void setUp() throws Exception {
+ dataNodeSchemaCache = DataNodeSchemaCache.getInstance();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dataNodeSchemaCache.cleanUp();
+ }
+
+ @Test
+ public void testGetSchemaEntity() throws IllegalPathException {
+ PartialPath device1 = new PartialPath("root.sg1.d1");
+ String[] measurements = new String[3];
+ measurements[0] = "s1";
+ measurements[1] = "s2";
+ measurements[2] = "s3";
+ TSDataType[] tsDataTypes = new TSDataType[3];
+ tsDataTypes[0] = TSDataType.INT32;
+ tsDataTypes[1] = TSDataType.FLOAT;
+ tsDataTypes[2] = TSDataType.BOOLEAN;
+
+ Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap1 =
+ dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
+ device1, measurements, tsDataTypes, false);
+ Assert.assertEquals(
+ TSDataType.INT32,
+ schemaCacheEntityMap1.get(new
PartialPath("root.sg1.d1.s1")).getTsDataType());
+ Assert.assertEquals(
+ TSDataType.FLOAT,
+ schemaCacheEntityMap1.get(new
PartialPath("root.sg1.d1.s2")).getTsDataType());
+ Assert.assertEquals(
+ TSDataType.BOOLEAN,
+ schemaCacheEntityMap1.get(new
PartialPath("root.sg1.d1.s3")).getTsDataType());
+ Assert.assertEquals(3,
dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+
+ String[] otherMeasurements = new String[3];
+ otherMeasurements[0] = "s3";
+ otherMeasurements[1] = "s4";
+ otherMeasurements[2] = "s5";
+ TSDataType[] otherTsDataTypes = new TSDataType[3];
+ otherTsDataTypes[0] = TSDataType.BOOLEAN;
+ otherTsDataTypes[1] = TSDataType.TEXT;
+ otherTsDataTypes[2] = TSDataType.INT64;
+
+ Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap2 =
+ dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
+ device1, otherMeasurements, otherTsDataTypes, false);
+ Assert.assertEquals(
+ TSDataType.BOOLEAN,
+ schemaCacheEntityMap2.get(new
PartialPath("root.sg1.d1.s3")).getTsDataType());
+ Assert.assertEquals(
+ TSDataType.TEXT,
+ schemaCacheEntityMap2.get(new
PartialPath("root.sg1.d1.s4")).getTsDataType());
+ Assert.assertEquals(
+ TSDataType.INT64,
+ schemaCacheEntityMap2.get(new
PartialPath("root.sg1.d1.s5")).getTsDataType());
+ Assert.assertEquals(5,
dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+ }
+}