This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch xkf_id_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 72642fb4270410e328e669e222fc2a4c28ec6745
Author: 151250176 <[email protected]>
AuthorDate: Tue Dec 7 17:30:24 2021 +0800

    add test
---
 .../engine/storagegroup/StorageGroupProcessor.java |  75 ++---
 .../org/apache/iotdb/db/metadata/MManager.java     |  75 +++--
 .../apache/iotdb/db/metadata/id_table/IDTable.java | 268 ++++++++++++++----
 .../db/metadata/id_table/entry/SchemaEntry.java    |   9 +
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |  18 +-
 .../iotdb/db/metadata/id_table/IDManagerTest.java  |   5 -
 .../iotdb/db/metadata/id_table/IDTableTest.java    | 315 +++++++++++++++++++++
 .../metadata/id_table/entry/SchemaEntryTest.java   |  26 +-
 8 files changed, 650 insertions(+), 141 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 41edad5..fbcd087 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,39 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static 
org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
+import static 
org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
+import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -56,6 +89,7 @@ import 
org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.id_table.IDTable;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -83,44 +117,9 @@ import 
org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static 
org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
-import static 
org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
-import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in 
which there is only one
  * TsFileProcessor in the working status. <br>
@@ -291,6 +290,8 @@ public class StorageGroupProcessor {
 
   public static final long COMPACTION_TASK_SUBMIT_DELAY = 20L * 1000L;
 
+  private IDTable idTable = new IDTable();
+
   /** get the direct byte buffer from pool, each fetch contains two ByteBuffer 
*/
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
@@ -3303,4 +3304,8 @@ public class StorageGroupProcessor {
   public ScheduledExecutorService getTimedCompactionScheduleTask() {
     return timedCompactionScheduleTask;
   }
+
+  public IDTable getIdTable() {
+    return idTable;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 433bd31..9a8adbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,12 +18,33 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
@@ -37,6 +58,7 @@ import 
org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.metadata.TemplateIsInUseException;
 import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
+import org.apache.iotdb.db.metadata.id_table.IDTable;
 import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
 import org.apache.iotdb.db.metadata.logfile.MLogReader;
 import org.apache.iotdb.db.metadata.logfile.MLogWriter;
@@ -94,31 +116,9 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
 /**
  * This class takes the responsibility of serialization of all the metadata 
info and persistent it
  * into files. This class contains all the interfaces to modify the metadata 
for delta system. All
@@ -382,12 +382,12 @@ public class MManager {
     switch (plan.getOperatorType()) {
       case CREATE_TIMESERIES:
         CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) 
plan;
-        createTimeseries(createTimeSeriesPlan, 
createTimeSeriesPlan.getTagOffset());
+        createTimeseriesEntry(createTimeSeriesPlan, 
createTimeSeriesPlan.getTagOffset());
         break;
       case CREATE_ALIGNED_TIMESERIES:
         CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
             (CreateAlignedTimeSeriesPlan) plan;
-        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+        createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan);
         break;
       case DELETE_TIMESERIES:
         DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) 
plan;
@@ -456,6 +456,20 @@ public class MManager {
     createTimeseries(plan, -1);
   }
 
+  public void createTimeseriesEntry(CreateTimeSeriesPlan plan, long offset)
+      throws MetadataException {
+    createTimeseries(plan, offset);
+
+    // update id table
+    try {
+      IDTable idTable =
+          
StorageEngine.getInstance().getProcessor(plan.getPath().getDevicePath()).getIdTable();
+      idTable.createTimeseries(plan);
+    } catch (StorageEngineException e) {
+      logger.error("get id table error");
+    }
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws 
MetadataException {
     if (!allowToCreateNewSeries) {
@@ -558,6 +572,19 @@ public class MManager {
             prefixPath, measurements, dataTypes, encodings, compressors, 
null));
   }
 
+  public void createAlignedTimeSeriesEntry(CreateAlignedTimeSeriesPlan plan)
+      throws MetadataException {
+    createAlignedTimeSeries(plan);
+
+    // update id table
+    try {
+      IDTable idTable = 
StorageEngine.getInstance().getProcessor(plan.getPrefixPath()).getIdTable();
+      idTable.createAlignedTimeseries(plan);
+    } catch (StorageEngineException e) {
+      logger.error("get id table error");
+    }
+  }
+
   /**
    * create aligned timeseries
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
index 2f4ef22..ff9f84d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java
@@ -19,18 +19,37 @@
 
 package org.apache.iotdb.db.metadata.id_table;
 
+import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.metadata.id_table.entry.DeviceEntry;
+import org.apache.iotdb.db.metadata.id_table.entry.DeviceIDFactory;
 import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID;
+import org.apache.iotdb.db.metadata.id_table.entry.InsertMeasurementMNode;
 import org.apache.iotdb.db.metadata.id_table.entry.SchemaEntry;
 import org.apache.iotdb.db.metadata.id_table.entry.TimeseriesID;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +67,8 @@ public class IDTable {
   private Map<IDeviceID, DeviceEntry>[] idTables;
   /** disk schema manager to manage disk schema entry */
   private DiskSchemaManager diskSchemaManager;
+  /** iotdb config */
+  protected static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
   public IDTable() {
     idTables = new Map[NUM_OF_SLOTS];
@@ -56,92 +77,140 @@ public class IDTable {
     }
   }
 
-  /**
-   * get device id from device path and check is aligned,
-   *
-   * @param seriesKey path of the time series
-   * @param isAligned whether the insert plan is aligned
-   * @return reused device id of the timeseries
-   */
-  public synchronized IDeviceID getDeviceID(PartialPath seriesKey, boolean 
isAligned)
+  public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan 
plan)
       throws MetadataException {
-    TimeseriesID timeseriesID = new TimeseriesID(seriesKey);
-    IDeviceID deviceID = timeseriesID.getDeviceID();
-    int slot = calculateSlot(deviceID);
-
-    DeviceEntry deviceEntry = idTables[slot].get(deviceID);
-    // new device
-    if (deviceEntry == null) {
-      deviceEntry = new DeviceEntry(deviceID);
-      deviceEntry.setAligned(isAligned);
-      idTables[slot].put(deviceID, deviceEntry);
+    DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true);
 
-      return deviceID;
-    }
-
-    // check aligned
-    if (deviceEntry.isAligned() != isAligned) {
-      throw new MetadataException(
-          String.format(
-              "Timeseries under path [%s]'s align value is [%b], which is not 
consistent with insert plan",
-              seriesKey.getDevice(), deviceEntry.isAligned()));
+    for (int i = 0; i < plan.getMeasurements().size(); i++) {
+      SchemaEntry schemaEntry =
+          new SchemaEntry(
+              plan.getDataTypes().get(i), plan.getEncodings().get(i), 
plan.getCompressors().get(i));
+      deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry);
     }
+  }
 
-    // reuse device id in map
-    return deviceEntry.getDeviceID();
+  public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws 
MetadataException {
+    DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), 
false);
+    SchemaEntry schemaEntry =
+        new SchemaEntry(plan.getDataType(), plan.getEncoding(), 
plan.getCompressor());
+    deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry);
   }
 
   /**
    * check whether a time series is exist if exist, check the type consistency 
if not exist, call
    * MManager to create it
    *
-   * @param seriesKey path of the time series
-   * @param dataType type of the time series
    * @return measurement MNode of the time series or null if type is not match
    */
-  public synchronized IMeasurementMNode checkOrCreateIfNotExist(
-      PartialPath seriesKey, TSDataType dataType) {
-    TimeseriesID timeseriesID = new TimeseriesID(seriesKey);
-    IDeviceID deviceID = timeseriesID.getDeviceID();
-    int slot = calculateSlot(deviceID);
+  public synchronized IMeasurementMNode getOrCreateMeasurementIfNotExist(
+      DeviceEntry deviceEntry, InsertPlan plan, int loc) throws 
MetadataException {
+    String measurementName = plan.getMeasurements()[loc];
+    PartialPath seriesKey = new PartialPath(plan.getDeviceId().toString(), 
measurementName);
 
-    DeviceEntry deviceEntry =
-        idTables[slot].computeIfAbsent(deviceID, id -> new 
DeviceEntry(deviceID));
-    SchemaEntry schemaEntry = 
deviceEntry.getSchemaEntry(timeseriesID.getMeasurement());
+    SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName);
 
     // if not exist, we create it
     if (schemaEntry == null) {
-      schemaEntry = new SchemaEntry(dataType);
+      if (!config.isAutoCreateSchemaEnabled()) {
+        throw new PathNotExistException(seriesKey.toString());
+      }
 
-      // 1. create new timeseries in mmanager
+      // create new timeseries in mmanager
       try {
-        MManager.getInstance()
-            .createTimeseries(
-                seriesKey,
-                dataType,
-                schemaEntry.getTSEncoding(),
-                schemaEntry.getCompressionType(),
-                null);
+        if (plan.isAligned()) {
+          List<TSEncoding> encodings = new ArrayList<>();
+          List<CompressionType> compressors = new ArrayList<>();
+          for (TSDataType dataType : plan.getDataTypes()) {
+            encodings.add(getDefaultEncoding(dataType));
+            
compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
+          }
+
+          CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
+              new CreateAlignedTimeSeriesPlan(
+                  plan.getDeviceId(),
+                  Arrays.asList(plan.getMeasurements()),
+                  Arrays.asList(plan.getDataTypes()),
+                  encodings,
+                  compressors,
+                  null);
+
+          
IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan);
+        } else {
+          CreateTimeSeriesPlan createTimeSeriesPlan =
+              new CreateTimeSeriesPlan(
+                  seriesKey,
+                  plan.getDataTypes()[loc],
+                  getDefaultEncoding(plan.getDataTypes()[loc]),
+                  TSFileDescriptor.getInstance().getConfig().getCompressor(),
+                  null,
+                  null,
+                  null,
+                  null);
+
+          IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1);
+        }
       } catch (MetadataException e) {
-        logger.error("create timeseries failed, path is:" + seriesKey + " type 
is: " + dataType);
+        logger.error("create timeseries failed, path is:" + seriesKey);
       }
 
-      // 2. insert this schema into id table
-      deviceEntry.putSchemaEntry(timeseriesID.getMeasurement(), schemaEntry);
-
-      return null;
+      schemaEntry = deviceEntry.getSchemaEntry(measurementName);
     }
 
-    // type mismatch, we return null and this will be handled by upper level
-    if (!schemaEntry.getTSDataType().equals(dataType)) {
-      return null;
+    return new InsertMeasurementMNode(measurementName, schemaEntry);
+  }
+
+  public synchronized IDeviceID getSeriesSchemas(InsertPlan plan) throws 
MetadataException {
+    PartialPath devicePath = plan.getDeviceId();
+    String[] measurementList = plan.getMeasurements();
+    IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
+
+    // 1. get device entry and check align
+    DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned());
+
+    // 2. get schema of each measurement
+    for (int i = 0; i < measurementList.length; i++) {
+      try {
+        // get MeasurementMNode, auto create if absent
+        try {
+          IMeasurementMNode measurementMNode =
+              getOrCreateMeasurementIfNotExist(deviceEntry, plan, i);
+
+          checkDataTypeMatch(plan, i, measurementMNode.getSchema().getType());
+          measurementMNodes[i] = measurementMNode;
+        } catch (DataTypeMismatchException mismatchException) {
+          if (!config.isEnablePartialInsert()) {
+            throw mismatchException;
+          } else {
+            // mark failed measurement
+            plan.markFailedMeasurementInsertion(i, mismatchException);
+          }
+        }
+      } catch (MetadataException e) {
+        if (IoTDB.isClusterMode()) {
+          logger.debug(
+              "meet error when check {}.{}, message: {}",
+              devicePath,
+              measurementList[i],
+              e.getMessage());
+        } else {
+          logger.warn(
+              "meet error when check {}.{}, message: {}",
+              devicePath,
+              measurementList[i],
+              e.getMessage());
+        }
+        if (config.isEnablePartialInsert()) {
+          // mark failed measurement
+          plan.markFailedMeasurementInsertion(i, e);
+        } else {
+          throw e;
+        }
+      }
     }
 
-    return null;
+    return deviceEntry.getDeviceID();
   }
 
-  public synchronized IMeasurementMNode getSeriesSchemas(InsertPlan 
insertPlan) {}
-
   /**
    * update latest flushed time of one timeseries
    *
@@ -167,13 +236,47 @@ public class IDTable {
   }
 
   /**
+   * get device id from device path and check is aligned,
+   *
+   * @param deviceName device name of the time series
+   * @param isAligned whether the insert plan is aligned
+   * @return device entry of the timeseries
+   */
+  private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean isAligned)
+      throws MetadataException {
+    IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
+    int slot = calculateSlot(deviceID);
+
+    DeviceEntry deviceEntry = idTables[slot].get(deviceID);
+    // new device
+    if (deviceEntry == null) {
+      deviceEntry = new DeviceEntry(deviceID);
+      deviceEntry.setAligned(isAligned);
+      idTables[slot].put(deviceID, deviceEntry);
+
+      return deviceEntry;
+    }
+
+    // check aligned
+    if (deviceEntry.isAligned() != isAligned) {
+      throw new MetadataException(
+          String.format(
+              "Timeseries under path [%s]'s align value is [%b], which is not 
consistent with insert plan",
+              deviceName, deviceEntry.isAligned()));
+    }
+
+    // reuse device entry in map
+    return deviceEntry;
+  }
+
+  /**
    * calculate slot that this deviceID should in
    *
    * @param deviceID device id
    * @return slot number
    */
   private int calculateSlot(IDeviceID deviceID) {
-    return deviceID.hashCode() % NUM_OF_SLOTS;
+    return Math.abs(deviceID.hashCode()) % NUM_OF_SLOTS;
   }
 
   /**
@@ -201,4 +304,47 @@ public class IDTable {
 
     return schemaEntry;
   }
+
+  // from mmanger
+  private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType 
dataType)
+      throws MetadataException {
+    //    TSDataType insertDataType;
+    //    if (plan instanceof InsertRowPlan) {
+    //      if (!((InsertRowPlan) plan).isNeedInferType()) {
+    //        // only when InsertRowPlan's values is object[], we should check 
type
+    //        insertDataType = getTypeInLoc(plan, loc);
+    //      } else {
+    //        insertDataType = dataType;
+    //      }
+    //    } else {
+    //      insertDataType = getTypeInLoc(plan, loc);
+    //    }
+    TSDataType insertDataType = plan.getDataTypes()[loc];
+    if (dataType != insertDataType) {
+      String measurement = plan.getMeasurements()[loc];
+      logger.warn(
+          "DataType mismatch, Insert measurement {} type {}, metadata tree 
type {}",
+          measurement,
+          insertDataType,
+          dataType);
+      throw new DataTypeMismatchException(measurement, insertDataType, 
dataType);
+    }
+  }
+
+  /** get dataType of plan, in loc measurements only support InsertRowPlan and 
InsertTabletPlan */
+  private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws 
MetadataException {
+    TSDataType dataType;
+    if (plan instanceof InsertRowPlan) {
+      InsertRowPlan tPlan = (InsertRowPlan) plan;
+      dataType =
+          TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], 
tPlan.isNeedInferType());
+    } else if (plan instanceof InsertTabletPlan) {
+      dataType = (plan).getDataTypes()[loc];
+    } else {
+      throw new MetadataException(
+          String.format(
+              "Only support insert and insertTablet, plan is [%s]", 
plan.getOperatorType()));
+    }
+    return dataType;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java
index 7ecec8f..7daaf8c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java
@@ -56,6 +56,15 @@ public class SchemaEntry implements ILastCacheContainer {
     flushTime = Long.MIN_VALUE;
   }
 
+  public SchemaEntry(TSDataType dataType, TSEncoding encoding, CompressionType 
compressionType) {
+    schema |= dataType.serialize();
+    schema |= (((long) encoding.serialize()) << 8);
+    schema |= (((long) compressionType.serialize()) << 16);
+
+    lastTime = Long.MIN_VALUE;
+    flushTime = Long.MIN_VALUE;
+  }
+
   /**
    * get ts data type from long value of schema
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 1d0f5af..a31b459 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -19,17 +19,17 @@
 
 package org.apache.iotdb.db.qp.physical.crud;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 public abstract class InsertPlan extends PhysicalPlan {
 
   protected PartialPath deviceId;
@@ -40,6 +40,8 @@ public abstract class InsertPlan extends PhysicalPlan {
   // get from MManager
   protected IMeasurementMNode[] measurementMNodes;
 
+  protected IDeviceID deviceID;
+
   // record the failed measurements, their reasons, and positions in 
"measurements"
   List<String> failedMeasurements;
   private List<Exception> failedExceptions;
@@ -187,4 +189,12 @@ public abstract class InsertPlan extends PhysicalPlan {
       }
     }
   }
+
+  public IDeviceID getDeviceID() {
+    return deviceID;
+  }
+
+  public void setDeviceID(IDeviceID deviceID) {
+    this.deviceID = deviceID;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDManagerTest.java 
b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDManagerTest.java
deleted file mode 100644
index 585a974..0000000
--- 
a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDManagerTest.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.iotdb.db.metadata.id_table;
-
-public class IDManagerTest {
-
-}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java 
b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
index 0c8f5f2..fbdc776 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java
@@ -1,5 +1,320 @@
+/// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+
 package org.apache.iotdb.db.metadata.id_table;
 
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
 public class IDTableTest {
 
+  private CompressionType compressionType;
+
+  @Before
+  public void setUp() {
+    compressionType = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testCreateAlignedTimeseriesAndInsert() {
+    MManager manager = IoTDB.metaManager;
+
+    try {
+      manager.setStorageGroup(new PartialPath("root.laptop"));
+      CreateAlignedTimeSeriesPlan plan =
+          new CreateAlignedTimeSeriesPlan(
+              new PartialPath("root.laptop.d1.aligned_device"),
+              Arrays.asList("s1", "s2", "s3"),
+              Arrays.asList(
+                  TSDataType.valueOf("FLOAT"),
+                  TSDataType.valueOf("INT64"),
+                  TSDataType.valueOf("INT32")),
+              Arrays.asList(
+                  TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), 
TSEncoding.valueOf("RLE")),
+              Arrays.asList(compressionType, compressionType, compressionType),
+              null);
+
+      manager.createAlignedTimeSeriesEntry(plan);
+
+      IDTable idTable =
+          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
+
+      // construct an insertRowPlan with mismatched data type
+      long time = 1L;
+      TSDataType[] dataTypes =
+          new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, 
TSDataType.INT32};
+
+      String[] columns = new String[3];
+      columns[0] = 2.0 + "";
+      columns[1] = 10000 + "";
+      columns[2] = 100 + "";
+
+      InsertRowPlan insertRowPlan =
+          new InsertRowPlan(
+              new PartialPath("root.laptop.d1.aligned_device"),
+              time,
+              new String[] {"s1", "s2", "s3"},
+              dataTypes,
+              columns,
+              true);
+      insertRowPlan.setMeasurementMNodes(
+          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+
+      idTable.getSeriesSchemas(insertRowPlan);
+
+      // with type mismatch
+      dataTypes = new TSDataType[] {TSDataType.FLOAT, TSDataType.DOUBLE, 
TSDataType.INT32};
+      InsertRowPlan insertRowPlan2 =
+          new InsertRowPlan(
+              new PartialPath("root.laptop.d1.aligned_device"),
+              time,
+              new String[] {"s1", "s2", "s3"},
+              dataTypes,
+              columns,
+              true);
+      insertRowPlan2.setMeasurementMNodes(
+          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+
+      // we should throw type mismatch exception here
+      try {
+        
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
+        idTable.getSeriesSchemas(insertRowPlan2);
+        fail();
+      } catch (Exception e) {
+
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateAlignedTimeseriesAndInsertNotAlignedData() {
+    MManager manager = IoTDB.metaManager;
+
+    try {
+      manager.setStorageGroup(new PartialPath("root.laptop"));
+      CreateAlignedTimeSeriesPlan plan =
+          new CreateAlignedTimeSeriesPlan(
+              new PartialPath("root.laptop.d1.aligned_device"),
+              Arrays.asList("s1", "s2", "s3"),
+              Arrays.asList(
+                  TSDataType.valueOf("FLOAT"),
+                  TSDataType.valueOf("INT64"),
+                  TSDataType.valueOf("INT32")),
+              Arrays.asList(
+                  TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), 
TSEncoding.valueOf("RLE")),
+              Arrays.asList(compressionType, compressionType, compressionType),
+              null);
+
+      manager.createAlignedTimeSeriesEntry(plan);
+
+      IDTable idTable =
+          StorageEngine.getInstance().getProcessor(new 
PartialPath("root.laptop")).getIdTable();
+
+      // construct an insertRowPlan with mismatched data type
+      long time = 1L;
+      TSDataType[] dataTypes =
+          new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, 
TSDataType.INT32};
+
+      String[] columns = new String[3];
+      columns[0] = 2.0 + "";
+      columns[1] = 10000 + "";
+      columns[2] = 100 + "";
+
+      InsertRowPlan insertRowPlan =
+          new InsertRowPlan(
+              new PartialPath("root.laptop.d1.aligned_device"),
+              time,
+              new String[] {"s1", "s2", "s3"},
+              dataTypes,
+              columns,
+              true);
+      insertRowPlan.setMeasurementMNodes(
+          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+
+      // call getSeriesSchemasAndReadLockDevice
+      IDeviceID deviceID = idTable.getSeriesSchemas(insertRowPlan);
+      // assertEquals(3, 
manager.getAllTimeseriesCount(node.getPartialPath().concatNode("**")));
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  //  @Test
+  //  public void testCreateAlignedTimeseriesAndInsertWithNotAlignedData() {
+  //    MManager manager = IoTDB.metaManager;
+  //    try {
+  //      manager.setStorageGroup(new PartialPath("root.laptop"));
+  //      manager.createAlignedTimeSeries(
+  //          new PartialPath("root.laptop.d1.aligned_device"),
+  //          Arrays.asList("s1", "s2", "s3"),
+  //          Arrays.asList(
+  //              TSDataType.valueOf("FLOAT"),
+  //              TSDataType.valueOf("INT64"),
+  //              TSDataType.valueOf("INT32")),
+  //          Arrays.asList(
+  //              TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), 
TSEncoding.valueOf("RLE")),
+  //          Arrays.asList(compressionType, compressionType, 
compressionType));
+  //
+  //      // construct an insertRowPlan with mismatched data type
+  //      long time = 1L;
+  //      TSDataType[] dataTypes =
+  //          new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, 
TSDataType.INT32};
+  //
+  //      String[] columns = new String[3];
+  //      columns[0] = "1.0";
+  //      columns[1] = "2";
+  //      columns[2] = "3";
+  //
+  //      InsertRowPlan insertRowPlan =
+  //          new InsertRowPlan(
+  //              new PartialPath("root.laptop.d1.aligned_device"),
+  //              time,
+  //              new String[] {"s1", "s2", "s3"},
+  //              dataTypes,
+  //              columns,
+  //              false);
+  //      insertRowPlan.setMeasurementMNodes(
+  //          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+  //
+  //      // call getSeriesSchemasAndReadLockDevice
+  //      manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
+  //    } catch (Exception e) {
+  //      e.printStackTrace();
+  //      Assert.assertEquals(
+  //          "Timeseries under path [root.laptop.d1.aligned_device] is 
aligned , please
+  // setInsertPlan.isAligned() = true",
+  //          e.getMessage());
+  //    }
+  //  }
+  //
+  //  @Test
+  //  public void testCreateTimeseriesAndInsertWithMismatchDataType() {
+  //    MManager manager = IoTDB.metaManager;
+  //    try {
+  //      manager.setStorageGroup(new PartialPath("root.laptop"));
+  //      manager.createTimeseries(
+  //          new PartialPath("root.laptop.d1.s0"),
+  //          TSDataType.valueOf("INT32"),
+  //          TSEncoding.valueOf("RLE"),
+  //          compressionType,
+  //          Collections.emptyMap());
+  //
+  //      // construct an insertRowPlan with mismatched data type
+  //      long time = 1L;
+  //      TSDataType[] dataTypes = new TSDataType[] {TSDataType.FLOAT};
+  //
+  //      String[] columns = new String[1];
+  //      columns[0] = 2.0 + "";
+  //
+  //      InsertRowPlan insertRowPlan =
+  //          new InsertRowPlan(
+  //              new PartialPath("root.laptop.d1"), time, new String[] 
{"s0"}, dataTypes, columns);
+  //      insertRowPlan.setMeasurementMNodes(
+  //          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+  //
+  //      // call getSeriesSchemasAndReadLockDevice
+  //      IMNode node = 
manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
+  //      assertEquals(1, 
manager.getAllTimeseriesCount(node.getPartialPath().concatNode("**")));
+  //      assertNull(insertRowPlan.getMeasurementMNodes()[0]);
+  //      assertEquals(1, insertRowPlan.getFailedMeasurementNumber());
+  //
+  //    } catch (Exception e) {
+  //      e.printStackTrace();
+  //      fail(e.getMessage());
+  //    }
+  //  }
+  //
+  //  @Test
+  //  public void testCreateTimeseriesAndInsertWithAlignedData() {
+  //    MManager manager = IoTDB.metaManager;
+  //    try {
+  //      manager.setStorageGroup(new PartialPath("root.laptop"));
+  //      manager.createTimeseries(
+  //          new PartialPath("root.laptop.d1.aligned_device.s1"),
+  //          TSDataType.valueOf("INT32"),
+  //          TSEncoding.valueOf("RLE"),
+  //          compressionType,
+  //          Collections.emptyMap());
+  //      manager.createTimeseries(
+  //          new PartialPath("root.laptop.d1.aligned_device.s2"),
+  //          TSDataType.valueOf("INT64"),
+  //          TSEncoding.valueOf("RLE"),
+  //          compressionType,
+  //          Collections.emptyMap());
+  //
+  //      // construct an insertRowPlan with mismatched data type
+  //      long time = 1L;
+  //      TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, 
TSDataType.INT64};
+  //
+  //      String[] columns = new String[2];
+  //      columns[0] = "1";
+  //      columns[1] = "2";
+  //
+  //      InsertRowPlan insertRowPlan =
+  //          new InsertRowPlan(
+  //              new PartialPath("root.laptop.d1.aligned_device"),
+  //              time,
+  //              new String[] {"s1", "s2"},
+  //              dataTypes,
+  //              columns,
+  //              true);
+  //      insertRowPlan.setMeasurementMNodes(
+  //          new IMeasurementMNode[insertRowPlan.getMeasurements().length]);
+  //
+  //      // call getSeriesSchemasAndReadLockDevice
+  //      manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
+  //    } catch (Exception e) {
+  //      e.printStackTrace();
+  //      Assert.assertEquals(
+  //          "Timeseries under path [root.laptop.d1.aligned_device] is not 
aligned , please set
+  // InsertPlan.isAligned() = false",
+  //          e.getMessage());
+  //    }
+  //  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java
 
b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java
index 34f490b..e5c83f8 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java
@@ -25,7 +25,8 @@ import static org.junit.Assert.assertEquals;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.junit.Test;
 
 public class SchemaEntryTest {
@@ -53,19 +54,20 @@ public class SchemaEntryTest {
       assertEquals(schemaEntry.getFlushTime(), 100);
 
       // last cache
-      Object o1 = new Object();
-      Object o2 = new Object();
-      schemaEntry.updateLastCache(new Pair<>(100L, o1));
-      assertEquals(schemaEntry.getLastValue(), o1);
-      assertEquals(schemaEntry.getLastTime(), 100L);
+      schemaEntry.updateCachedLast(
+          new TimeValuePair(100L, new TsPrimitiveType.TsLong(1L)), false, 0L);
+      assertEquals(new TsPrimitiveType.TsLong(1L), schemaEntry.getLastValue());
+      assertEquals(100L, schemaEntry.getLastTime());
 
-      schemaEntry.updateLastCache(new Pair<>(90L, o2));
-      assertEquals(schemaEntry.getLastValue(), o1);
-      assertEquals(schemaEntry.getLastTime(), 100L);
+      schemaEntry.updateCachedLast(
+          new TimeValuePair(90L, new TsPrimitiveType.TsLong(2L)), false, 0L);
+      assertEquals(new TsPrimitiveType.TsLong(1L), schemaEntry.getLastValue());
+      assertEquals(100L, schemaEntry.getLastTime());
 
-      schemaEntry.updateLastCache(new Pair<>(110L, o2));
-      assertEquals(schemaEntry.getLastValue(), o2);
-      assertEquals(schemaEntry.getLastTime(), 110L);
+      schemaEntry.updateCachedLast(
+          new TimeValuePair(110L, new TsPrimitiveType.TsLong(2L)), false, 0L);
+      assertEquals(new TsPrimitiveType.TsLong(2L), schemaEntry.getLastValue());
+      assertEquals(110L, schemaEntry.getLastTime());
     }
   }
 }

Reply via email to