This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fc62a59a40 [IOTDB-2958]create a simple schemaCache for datanode 
metadata (#5602)
fc62a59a40 is described below

commit fc62a59a401dce2043e36b62aa0be056632a84ee
Author: Yifu Zhou <[email protected]>
AuthorDate: Mon Apr 25 22:57:02 2022 +0800

    [IOTDB-2958]create a simple schemaCache for datanode metadata (#5602)
---
 .../resources/conf/iotdb-engine.properties         |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +
 .../db/metadata/cache/DataNodeSchemaCache.java     | 186 +++++++++++++++++++++
 .../iotdb/db/metadata/cache/SchemaCacheEntity.java | 114 +++++++++++++
 .../db/metadata/cache/DataNodeSchemaCacheTest.java |  95 +++++++++++
 6 files changed, 420 insertions(+)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 46bda55e3a..ac905a0f53 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -1007,6 +1007,11 @@ timestamp_precision=ms
 # Datatype: int
 # schema_region_device_node_cache_size=10000
 
+# cache size for DataNode.
+# This cache is used to improve insert speed where each datanode has its 
metadata cache and can do path consistency check locally.
+# Datatype: int
+# datanode_schema_cache_size=10000
+
 ####################
 ### Schema File Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d1bdc6d3b9..527dd4e42b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -890,6 +890,12 @@ public class IoTDBConfig {
           ? Runtime.getRuntime().availableProcessors() / 4
           : 1;
 
+  /**
+   * Cache size of dataNodeSchemaCache in{@link
+   * org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache}.
+   */
+  private int dataNodeSchemaCacheSize = 10000;
+
   public float getUdfMemoryBudgetInMB() {
     return udfMemoryBudgetInMB;
   }
@@ -2815,4 +2821,12 @@ public class IoTDBConfig {
   public void setClusterMode(boolean isClusterMode) {
     this.isClusterMode = isClusterMode;
   }
+
+  public int getDataNodeSchemaCacheSize() {
+    return dataNodeSchemaCacheSize;
+  }
+
+  public void setDataNodeSchemaCacheSize(int dataNodeSchemaCacheSize) {
+    this.dataNodeSchemaCacheSize = dataNodeSchemaCacheSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d318b2c112..bbfd20b6c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -826,6 +826,12 @@ public class IoTDBDescriptor {
               "iotdb_server_encrypt_decrypt_provider_parameter",
               conf.getEncryptDecryptProviderParameter()));
 
+      conf.setDataNodeSchemaCacheSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "datanode_schema_cache_size",
+                  String.valueOf(conf.getDataNodeSchemaCacheSize()))));
+
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance()
           .getConfig()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
new file mode 100644
index 0000000000..cc130ff120
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.analyze.FakeSchemaFetcherImpl;
+import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class takes the responsibility of metadata cache management of all 
DataRegions under
+ * StorageEngine
+ */
+public class DataNodeSchemaCache {
+  private static final Logger logger = 
LoggerFactory.getLogger(DataNodeSchemaCache.class);
+
+  private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private Cache<PartialPath, SchemaCacheEntity> schemaEntityCache;
+
+  // TODO use fakeSchemaFetcherImpl for test temporarily
+  private static final ISchemaFetcher schemaFetcher = new 
FakeSchemaFetcherImpl();
+
+  private DataNodeSchemaCache() {
+    schemaEntityCache =
+        
Caffeine.newBuilder().maximumSize(config.getDataNodeSchemaCacheSize()).build();
+  }
+
+  public static DataNodeSchemaCache getInstance() {
+    return DataNodeSchemaCache.DataNodeSchemaEntryCacheHolder.INSTANCE;
+  }
+
+  /** singleton pattern. */
+  private static class DataNodeSchemaEntryCacheHolder {
+    private static final DataNodeSchemaCache INSTANCE = new 
DataNodeSchemaCache();
+  }
+
+  /**
+   * Get SchemaEntity info without auto create schema
+   *
+   * @param devicePath should not be measurementPath or AlignedPath
+   * @param measurements
+   * @return timeseries partialPath and its SchemaEntity
+   */
+  public Map<PartialPath, SchemaCacheEntity> getSchemaEntity(
+      PartialPath devicePath, String[] measurements) {
+    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
+    SchemaCacheEntity schemaCacheEntity;
+    List<String> fetchMeasurements = new ArrayList<>();
+    for (String measurement : measurements) {
+      PartialPath path = null;
+      try {
+        path = new PartialPath(devicePath.getFullPath(), measurement);
+      } catch (IllegalPathException e) {
+        logger.error(
+            "Create PartialPath:{} failed.",
+            devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR + 
measurement);
+      }
+      schemaCacheEntity = schemaEntityCache.getIfPresent(path);
+      if (schemaCacheEntity != null) {
+        schemaCacheEntityMap.put(path, schemaCacheEntity);
+      } else {
+        fetchMeasurements.add(measurement);
+      }
+    }
+    if (fetchMeasurements.size() != 0) {
+      SchemaTree schemaTree;
+      schemaTree = schemaFetcher.fetchSchema(new PathPatternTree(devicePath, 
fetchMeasurements));
+      // TODO need to construct schemaEntry from schemaTree
+
+    }
+    return schemaCacheEntityMap;
+  }
+
+  /**
+   * Get SchemaEntity info with auto create schema
+   *
+   * @param devicePath
+   * @param measurements
+   * @param tsDataTypes
+   * @param isAligned
+   * @return timeseries partialPath and its SchemaEntity
+   */
+  public Map<PartialPath, SchemaCacheEntity> getSchemaEntityWithAutoCreate(
+      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, 
boolean isAligned) {
+    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
+    SchemaCacheEntity schemaCacheEntity;
+    List<String> fetchMeasurements = new ArrayList<>();
+    List<TSDataType> fetchTsDataTypes = new ArrayList<>();
+    for (int i = 0; i < measurements.length; i++) {
+      PartialPath path = null;
+      try {
+        path = new PartialPath(devicePath.getFullPath(), measurements[i]);
+      } catch (IllegalPathException e) {
+        logger.error(
+            "Create PartialPath:{} failed.",
+            devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR + 
measurements[i]);
+      }
+      schemaCacheEntity = schemaEntityCache.getIfPresent(path);
+      if (schemaCacheEntity != null) {
+        schemaCacheEntityMap.put(path, schemaCacheEntity);
+      } else {
+        fetchMeasurements.add(measurements[i]);
+        fetchTsDataTypes.add(tsDataTypes[i]);
+      }
+    }
+    if (fetchMeasurements.size() != 0) {
+      SchemaTree schemaTree;
+      schemaTree =
+          schemaFetcher.fetchSchemaWithAutoCreate(
+              devicePath,
+              fetchMeasurements.toArray(new String[fetchMeasurements.size()]),
+              fetchTsDataTypes.toArray(new 
TSDataType[fetchTsDataTypes.size()]),
+              isAligned);
+      // TODO need to construct schemaEntry from schemaTree
+
+      for (int i = 0; i < fetchMeasurements.size(); i++) {
+        try {
+          PartialPath path = new PartialPath(devicePath.getFullPath(), 
fetchMeasurements.get(i));
+          SchemaCacheEntity entity =
+              new SchemaCacheEntity(fetchMeasurements.get(i), 
fetchTsDataTypes.get(i), isAligned);
+          schemaEntityCache.put(path, entity);
+          schemaCacheEntityMap.put(path, entity);
+        } catch (IllegalPathException e) {
+          logger.error("Create PartialPath:{} failed.", 
devicePath.getFullPath());
+        }
+      }
+    }
+    return schemaCacheEntityMap;
+  }
+
+  /**
+   * For delete timeseries meatadata cache operation
+   *
+   * @param partialPath
+   * @return
+   */
+  public void invalidate(PartialPath partialPath) {
+    schemaEntityCache.invalidate(partialPath);
+  }
+
+  @TestOnly
+  public void cleanUp() {
+    schemaEntityCache.invalidateAll();
+    schemaEntityCache.cleanUp();
+  }
+
+  @TestOnly
+  protected Cache<PartialPath, SchemaCacheEntity> getSchemaEntityCache() {
+    return schemaEntityCache;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
new file mode 100644
index 0000000000..0e07b56e3f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+public class SchemaCacheEntity {
+  private final String schemaEntryId;
+
+  private TSDataType tsDataType;
+
+  private TSEncoding tsEncoding;
+
+  private CompressionType compressionType;
+
+  private String alias;
+
+  private boolean isAligned;
+
+  @TestOnly
+  public SchemaCacheEntity() {
+    this.schemaEntryId = "1";
+  }
+
+  public SchemaCacheEntity(String schemaEntryId, TSDataType tsDataType, 
boolean isAligned) {
+    this.schemaEntryId = schemaEntryId;
+    this.tsDataType = tsDataType;
+    this.isAligned = isAligned;
+    this.tsEncoding =
+        
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder());
+    this.compressionType = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+    this.alias = "";
+  }
+
+  public SchemaCacheEntity(
+      String schemaEntryId,
+      TSDataType tsDataType,
+      TSEncoding tsEncoding,
+      CompressionType compressionType,
+      String alias,
+      boolean isAligned) {
+    this.schemaEntryId = schemaEntryId;
+    this.tsDataType = tsDataType;
+    this.tsEncoding = tsEncoding;
+    this.compressionType = compressionType;
+    this.alias = alias;
+    this.isAligned = isAligned;
+  }
+
+  public String getSchemaEntryId() {
+    return schemaEntryId;
+  }
+
+  public TSDataType getTsDataType() {
+    return tsDataType;
+  }
+
+  public void setTsDataType(TSDataType tsDataType) {
+    this.tsDataType = tsDataType;
+  }
+
+  public TSEncoding getTsEncoding() {
+    return tsEncoding;
+  }
+
+  public void setTsEncoding(TSEncoding tsEncoding) {
+    this.tsEncoding = tsEncoding;
+  }
+
+  public CompressionType getCompressionType() {
+    return compressionType;
+  }
+
+  public void setCompressionType(CompressionType compressionType) {
+    this.compressionType = compressionType;
+  }
+
+  public String getAlias() {
+    return alias;
+  }
+
+  public void setAlias(String alias) {
+    this.alias = alias;
+  }
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  public void setAligned(boolean aligned) {
+    isAligned = aligned;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
 
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
new file mode 100644
index 0000000000..7a1cf1f32c
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class DataNodeSchemaCacheTest {
+  DataNodeSchemaCache dataNodeSchemaCache;
+
+  @Before
+  public void setUp() throws Exception {
+    dataNodeSchemaCache = DataNodeSchemaCache.getInstance();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    dataNodeSchemaCache.cleanUp();
+  }
+
+  @Test
+  public void testGetSchemaEntity() throws IllegalPathException {
+    PartialPath device1 = new PartialPath("root.sg1.d1");
+    String[] measurements = new String[3];
+    measurements[0] = "s1";
+    measurements[1] = "s2";
+    measurements[2] = "s3";
+    TSDataType[] tsDataTypes = new TSDataType[3];
+    tsDataTypes[0] = TSDataType.INT32;
+    tsDataTypes[1] = TSDataType.FLOAT;
+    tsDataTypes[2] = TSDataType.BOOLEAN;
+
+    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap1 =
+        dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
+            device1, measurements, tsDataTypes, false);
+    Assert.assertEquals(
+        TSDataType.INT32,
+        schemaCacheEntityMap1.get(new 
PartialPath("root.sg1.d1.s1")).getTsDataType());
+    Assert.assertEquals(
+        TSDataType.FLOAT,
+        schemaCacheEntityMap1.get(new 
PartialPath("root.sg1.d1.s2")).getTsDataType());
+    Assert.assertEquals(
+        TSDataType.BOOLEAN,
+        schemaCacheEntityMap1.get(new 
PartialPath("root.sg1.d1.s3")).getTsDataType());
+    Assert.assertEquals(3, 
dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+
+    String[] otherMeasurements = new String[3];
+    otherMeasurements[0] = "s3";
+    otherMeasurements[1] = "s4";
+    otherMeasurements[2] = "s5";
+    TSDataType[] otherTsDataTypes = new TSDataType[3];
+    otherTsDataTypes[0] = TSDataType.BOOLEAN;
+    otherTsDataTypes[1] = TSDataType.TEXT;
+    otherTsDataTypes[2] = TSDataType.INT64;
+
+    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap2 =
+        dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
+            device1, otherMeasurements, otherTsDataTypes, false);
+    Assert.assertEquals(
+        TSDataType.BOOLEAN,
+        schemaCacheEntityMap2.get(new 
PartialPath("root.sg1.d1.s3")).getTsDataType());
+    Assert.assertEquals(
+        TSDataType.TEXT,
+        schemaCacheEntityMap2.get(new 
PartialPath("root.sg1.d1.s4")).getTsDataType());
+    Assert.assertEquals(
+        TSDataType.INT64,
+        schemaCacheEntityMap2.get(new 
PartialPath("root.sg1.d1.s5")).getTsDataType());
+    Assert.assertEquals(5, 
dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+  }
+}

Reply via email to