This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch xkf_id_table in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 72642fb4270410e328e669e222fc2a4c28ec6745 Author: 151250176 <[email protected]> AuthorDate: Tue Dec 7 17:30:24 2021 +0800 add test --- .../engine/storagegroup/StorageGroupProcessor.java | 75 ++--- .../org/apache/iotdb/db/metadata/MManager.java | 75 +++-- .../apache/iotdb/db/metadata/id_table/IDTable.java | 268 ++++++++++++++---- .../db/metadata/id_table/entry/SchemaEntry.java | 9 + .../iotdb/db/qp/physical/crud/InsertPlan.java | 18 +- .../iotdb/db/metadata/id_table/IDManagerTest.java | 5 - .../iotdb/db/metadata/id_table/IDTableTest.java | 315 +++++++++++++++++++++ .../metadata/id_table/entry/SchemaEntryTest.java | 26 +- 8 files changed, 650 insertions(+), 141 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 41edad5..fbcd087 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,6 +18,39 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; +import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; +import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -56,6 +89,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.id_table.IDTable; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; @@ -83,44 +117,9 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; - -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Deque; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; - -import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; -import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one * TsFileProcessor in the working status. <br> @@ -291,6 +290,8 @@ public class StorageGroupProcessor { public static final long COMPACTION_TASK_SUBMIT_DELAY = 20L * 1000L; + private IDTable idTable = new IDTable(); + /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */ public ByteBuffer[] getWalDirectByteBuffer() { ByteBuffer[] res = new ByteBuffer[2]; @@ -3303,4 +3304,8 @@ public class StorageGroupProcessor { public ScheduledExecutorService getTimedCompactionScheduleTask() { return timedCompactionScheduleTask; } + + public IDTable getIdTable() { + return idTable; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 433bd31..9a8adbd 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -18,12 +18,33 @@ */ package org.apache.iotdb.db.metadata; +import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine; +import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException; import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; import org.apache.iotdb.db.exception.metadata.DeleteFailedException; @@ -37,6 +58,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.metadata.TemplateIsInUseException; import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException; +import org.apache.iotdb.db.metadata.id_table.IDTable; import org.apache.iotdb.db.metadata.lastCache.LastCacheManager; import org.apache.iotdb.db.metadata.logfile.MLogReader; import org.apache.iotdb.db.metadata.logfile.MLogWriter; @@ -94,31 +116,9 @@ import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; -import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; - /** * This class takes the responsibility of serialization of all the metadata info and persistent it * into files. This class contains all the interfaces to modify the metadata for delta system. All @@ -382,12 +382,12 @@ public class MManager { switch (plan.getOperatorType()) { case CREATE_TIMESERIES: CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan; - createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset()); + createTimeseriesEntry(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset()); break; case CREATE_ALIGNED_TIMESERIES: CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = (CreateAlignedTimeSeriesPlan) plan; - createAlignedTimeSeries(createAlignedTimeSeriesPlan); + createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan); break; case DELETE_TIMESERIES: DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan; @@ -456,6 +456,20 @@ public class MManager { createTimeseries(plan, -1); } + public void createTimeseriesEntry(CreateTimeSeriesPlan plan, long offset) + throws MetadataException { + createTimeseries(plan, offset); + + // update id table + try { + IDTable idTable = + StorageEngine.getInstance().getProcessor(plan.getPath().getDevicePath()).getIdTable(); + idTable.createTimeseries(plan); + } catch (StorageEngineException e) { + logger.error("get id table error"); + } + } + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException { if (!allowToCreateNewSeries) { @@ -558,6 +572,19 @@ public class MManager { prefixPath, measurements, dataTypes, encodings, compressors, null)); } + public void createAlignedTimeSeriesEntry(CreateAlignedTimeSeriesPlan plan) + throws MetadataException { + createAlignedTimeSeries(plan); + + // update id table + try { + IDTable idTable = StorageEngine.getInstance().getProcessor(plan.getPrefixPath()).getIdTable(); + idTable.createAlignedTimeseries(plan); + } catch (StorageEngineException e) { + logger.error("get id table error"); + } + } + /** * create aligned timeseries * diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java index 2f4ef22..ff9f84d 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/IDTable.java @@ -19,18 +19,37 @@ package org.apache.iotdb.db.metadata.id_table; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.metadata.id_table.entry.DeviceEntry; +import org.apache.iotdb.db.metadata.id_table.entry.DeviceIDFactory; import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID; +import org.apache.iotdb.db.metadata.id_table.entry.InsertMeasurementMNode; import org.apache.iotdb.db.metadata.id_table.entry.SchemaEntry; import org.apache.iotdb.db.metadata.id_table.entry.TimeseriesID; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.TypeInferenceUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +67,8 @@ public class IDTable { private Map<IDeviceID, DeviceEntry>[] idTables; /** disk schema manager to manage disk schema entry */ private DiskSchemaManager diskSchemaManager; + /** iotdb config */ + protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); public IDTable() { idTables = new Map[NUM_OF_SLOTS]; @@ -56,92 +77,140 @@ public class IDTable { } } - /** - * get device id from device path and check is aligned, - * - * @param seriesKey path of the time series - * @param isAligned whether the insert plan is aligned - * @return reused device id of the timeseries - */ - public synchronized IDeviceID getDeviceID(PartialPath seriesKey, boolean isAligned) + public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan) throws MetadataException { - TimeseriesID timeseriesID = new TimeseriesID(seriesKey); - IDeviceID deviceID = timeseriesID.getDeviceID(); - int slot = calculateSlot(deviceID); - - DeviceEntry deviceEntry = idTables[slot].get(deviceID); - // new device - if (deviceEntry == null) { - deviceEntry = new DeviceEntry(deviceID); - deviceEntry.setAligned(isAligned); - idTables[slot].put(deviceID, deviceEntry); + DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true); - return deviceID; - } - - // check aligned - if (deviceEntry.isAligned() != isAligned) { - throw new MetadataException( - String.format( - "Timeseries under path [%s]'s align value is [%b], which is not consistent with insert plan", - seriesKey.getDevice(), deviceEntry.isAligned())); + for (int i = 0; i < plan.getMeasurements().size(); i++) { + SchemaEntry schemaEntry = + new SchemaEntry( + plan.getDataTypes().get(i), plan.getEncodings().get(i), plan.getCompressors().get(i)); + deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry); } + } - // reuse device id in map - return deviceEntry.getDeviceID(); + public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException { + DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), false); + SchemaEntry schemaEntry = + new SchemaEntry(plan.getDataType(), plan.getEncoding(), plan.getCompressor()); + deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry); } /** * check whether a time series is exist if exist, check the type consistency if not exist, call * MManager to create it * - * @param seriesKey path of the time series - * @param dataType type of the time series * @return measurement MNode of the time series or null if type is not match */ - public synchronized IMeasurementMNode checkOrCreateIfNotExist( - PartialPath seriesKey, TSDataType dataType) { - TimeseriesID timeseriesID = new TimeseriesID(seriesKey); - IDeviceID deviceID = timeseriesID.getDeviceID(); - int slot = calculateSlot(deviceID); + public synchronized IMeasurementMNode getOrCreateMeasurementIfNotExist( + DeviceEntry deviceEntry, InsertPlan plan, int loc) throws MetadataException { + String measurementName = plan.getMeasurements()[loc]; + PartialPath seriesKey = new PartialPath(plan.getDeviceId().toString(), measurementName); - DeviceEntry deviceEntry = - idTables[slot].computeIfAbsent(deviceID, id -> new DeviceEntry(deviceID)); - SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(timeseriesID.getMeasurement()); + SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName); // if not exist, we create it if (schemaEntry == null) { - schemaEntry = new SchemaEntry(dataType); + if (!config.isAutoCreateSchemaEnabled()) { + throw new PathNotExistException(seriesKey.toString()); + } - // 1. create new timeseries in mmanager + // create new timeseries in mmanager try { - MManager.getInstance() - .createTimeseries( - seriesKey, - dataType, - schemaEntry.getTSEncoding(), - schemaEntry.getCompressionType(), - null); + if (plan.isAligned()) { + List<TSEncoding> encodings = new ArrayList<>(); + List<CompressionType> compressors = new ArrayList<>(); + for (TSDataType dataType : plan.getDataTypes()) { + encodings.add(getDefaultEncoding(dataType)); + compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); + } + + CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = + new CreateAlignedTimeSeriesPlan( + plan.getDeviceId(), + Arrays.asList(plan.getMeasurements()), + Arrays.asList(plan.getDataTypes()), + encodings, + compressors, + null); + + IoTDB.metaManager.createAlignedTimeSeriesEntry(createAlignedTimeSeriesPlan); + } else { + CreateTimeSeriesPlan createTimeSeriesPlan = + new CreateTimeSeriesPlan( + seriesKey, + plan.getDataTypes()[loc], + getDefaultEncoding(plan.getDataTypes()[loc]), + TSFileDescriptor.getInstance().getConfig().getCompressor(), + null, + null, + null, + null); + + IoTDB.metaManager.createTimeseriesEntry(createTimeSeriesPlan, -1); + } } catch (MetadataException e) { - logger.error("create timeseries failed, path is:" + seriesKey + " type is: " + dataType); + logger.error("create timeseries failed, path is:" + seriesKey); } - // 2. insert this schema into id table - deviceEntry.putSchemaEntry(timeseriesID.getMeasurement(), schemaEntry); - - return null; + schemaEntry = deviceEntry.getSchemaEntry(measurementName); } - // type mismatch, we return null and this will be handled by upper level - if (!schemaEntry.getTSDataType().equals(dataType)) { - return null; + return new InsertMeasurementMNode(measurementName, schemaEntry); + } + + public synchronized IDeviceID getSeriesSchemas(InsertPlan plan) throws MetadataException { + PartialPath devicePath = plan.getDeviceId(); + String[] measurementList = plan.getMeasurements(); + IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes(); + + // 1. get device entry and check align + DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned()); + + // 2. get schema of each measurement + for (int i = 0; i < measurementList.length; i++) { + try { + // get MeasurementMNode, auto create if absent + try { + IMeasurementMNode measurementMNode = + getOrCreateMeasurementIfNotExist(deviceEntry, plan, i); + + checkDataTypeMatch(plan, i, measurementMNode.getSchema().getType()); + measurementMNodes[i] = measurementMNode; + } catch (DataTypeMismatchException mismatchException) { + if (!config.isEnablePartialInsert()) { + throw mismatchException; + } else { + // mark failed measurement + plan.markFailedMeasurementInsertion(i, mismatchException); + } + } + } catch (MetadataException e) { + if (IoTDB.isClusterMode()) { + logger.debug( + "meet error when check {}.{}, message: {}", + devicePath, + measurementList[i], + e.getMessage()); + } else { + logger.warn( + "meet error when check {}.{}, message: {}", + devicePath, + measurementList[i], + e.getMessage()); + } + if (config.isEnablePartialInsert()) { + // mark failed measurement + plan.markFailedMeasurementInsertion(i, e); + } else { + throw e; + } + } } - return null; + return deviceEntry.getDeviceID(); } - public synchronized IMeasurementMNode getSeriesSchemas(InsertPlan insertPlan) {} - /** * update latest flushed time of one timeseries * @@ -167,13 +236,47 @@ public class IDTable { } /** + * get device id from device path and check is aligned, + * + * @param deviceName device name of the time series + * @param isAligned whether the insert plan is aligned + * @return device entry of the timeseries + */ + private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean isAligned) + throws MetadataException { + IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName); + int slot = calculateSlot(deviceID); + + DeviceEntry deviceEntry = idTables[slot].get(deviceID); + // new device + if (deviceEntry == null) { + deviceEntry = new DeviceEntry(deviceID); + deviceEntry.setAligned(isAligned); + idTables[slot].put(deviceID, deviceEntry); + + return deviceEntry; + } + + // check aligned + if (deviceEntry.isAligned() != isAligned) { + throw new MetadataException( + String.format( + "Timeseries under path [%s]'s align value is [%b], which is not consistent with insert plan", + deviceName, deviceEntry.isAligned())); + } + + // reuse device entry in map + return deviceEntry; + } + + /** * calculate slot that this deviceID should in * * @param deviceID device id * @return slot number */ private int calculateSlot(IDeviceID deviceID) { - return deviceID.hashCode() % NUM_OF_SLOTS; + return Math.abs(deviceID.hashCode()) % NUM_OF_SLOTS; } /** @@ -201,4 +304,47 @@ public class IDTable { return schemaEntry; } + + // from mmanger + private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType dataType) + throws MetadataException { + // TSDataType insertDataType; + // if (plan instanceof InsertRowPlan) { + // if (!((InsertRowPlan) plan).isNeedInferType()) { + // // only when InsertRowPlan's values is object[], we should check type + // insertDataType = getTypeInLoc(plan, loc); + // } else { + // insertDataType = dataType; + // } + // } else { + // insertDataType = getTypeInLoc(plan, loc); + // } + TSDataType insertDataType = plan.getDataTypes()[loc]; + if (dataType != insertDataType) { + String measurement = plan.getMeasurements()[loc]; + logger.warn( + "DataType mismatch, Insert measurement {} type {}, metadata tree type {}", + measurement, + insertDataType, + dataType); + throw new DataTypeMismatchException(measurement, insertDataType, dataType); + } + } + + /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */ + private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException { + TSDataType dataType; + if (plan instanceof InsertRowPlan) { + InsertRowPlan tPlan = (InsertRowPlan) plan; + dataType = + TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType()); + } else if (plan instanceof InsertTabletPlan) { + dataType = (plan).getDataTypes()[loc]; + } else { + throw new MetadataException( + String.format( + "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType())); + } + return dataType; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java index 7ecec8f..7daaf8c 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntry.java @@ -56,6 +56,15 @@ public class SchemaEntry implements ILastCacheContainer { flushTime = Long.MIN_VALUE; } + public SchemaEntry(TSDataType dataType, TSEncoding encoding, CompressionType compressionType) { + schema |= dataType.serialize(); + schema |= (((long) encoding.serialize()) << 8); + schema |= (((long) compressionType.serialize()) << 16); + + lastTime = Long.MIN_VALUE; + flushTime = Long.MIN_VALUE; + } + /** * get ts data type from long value of schema * diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 1d0f5af..a31b459 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -19,17 +19,17 @@ package org.apache.iotdb.db.qp.physical.crud; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - public abstract class InsertPlan extends PhysicalPlan { protected PartialPath deviceId; @@ -40,6 +40,8 @@ public abstract class InsertPlan extends PhysicalPlan { // get from MManager protected IMeasurementMNode[] measurementMNodes; + protected IDeviceID deviceID; + // record the failed measurements, their reasons, and positions in "measurements" List<String> failedMeasurements; private List<Exception> failedExceptions; @@ -187,4 +189,12 @@ public abstract class InsertPlan extends PhysicalPlan { } } } + + public IDeviceID getDeviceID() { + return deviceID; + } + + public void setDeviceID(IDeviceID deviceID) { + this.deviceID = deviceID; + } } diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDManagerTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDManagerTest.java deleted file mode 100644 index 585a974..0000000 --- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDManagerTest.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.apache.iotdb.db.metadata.id_table; - -public class IDManagerTest { - -} diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java index 0c8f5f2..fbdc776 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/IDTableTest.java @@ -1,5 +1,320 @@ +/// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ + package org.apache.iotdb.db.metadata.id_table; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metadata.id_table.entry.IDeviceID; +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + public class IDTableTest { + private CompressionType compressionType; + + @Before + public void setUp() { + compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); + EnvironmentUtils.envSetUp(); + } + + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanEnv(); + } + + @Test + public void testCreateAlignedTimeseriesAndInsert() { + MManager manager = IoTDB.metaManager; + + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + CreateAlignedTimeSeriesPlan plan = + new CreateAlignedTimeSeriesPlan( + new PartialPath("root.laptop.d1.aligned_device"), + Arrays.asList("s1", "s2", "s3"), + Arrays.asList( + TSDataType.valueOf("FLOAT"), + TSDataType.valueOf("INT64"), + TSDataType.valueOf("INT32")), + Arrays.asList( + TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")), + Arrays.asList(compressionType, compressionType, compressionType), + null); + + manager.createAlignedTimeSeriesEntry(plan); + + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = + new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32}; + + String[] columns = new String[3]; + columns[0] = 2.0 + ""; + columns[1] = 10000 + ""; + columns[2] = 100 + ""; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2", "s3"}, + dataTypes, + columns, + true); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + idTable.getSeriesSchemas(insertRowPlan); + + // with type mismatch + dataTypes = new TSDataType[] {TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.INT32}; + InsertRowPlan insertRowPlan2 = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2", "s3"}, + dataTypes, + columns, + true); + insertRowPlan2.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // we should throw type mismatch exception here + try { + IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false); + idTable.getSeriesSchemas(insertRowPlan2); + fail(); + } catch (Exception e) { + + } + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateAlignedTimeseriesAndInsertNotAlignedData() { + MManager manager = IoTDB.metaManager; + + try { + manager.setStorageGroup(new PartialPath("root.laptop")); + CreateAlignedTimeSeriesPlan plan = + new CreateAlignedTimeSeriesPlan( + new PartialPath("root.laptop.d1.aligned_device"), + Arrays.asList("s1", "s2", "s3"), + Arrays.asList( + TSDataType.valueOf("FLOAT"), + TSDataType.valueOf("INT64"), + TSDataType.valueOf("INT32")), + Arrays.asList( + TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")), + Arrays.asList(compressionType, compressionType, compressionType), + null); + + manager.createAlignedTimeSeriesEntry(plan); + + IDTable idTable = + StorageEngine.getInstance().getProcessor(new PartialPath("root.laptop")).getIdTable(); + + // construct an insertRowPlan with mismatched data type + long time = 1L; + TSDataType[] dataTypes = + new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32}; + + String[] columns = new String[3]; + columns[0] = 2.0 + ""; + columns[1] = 10000 + ""; + columns[2] = 100 + ""; + + InsertRowPlan insertRowPlan = + new InsertRowPlan( + new PartialPath("root.laptop.d1.aligned_device"), + time, + new String[] {"s1", "s2", "s3"}, + dataTypes, + columns, + true); + insertRowPlan.setMeasurementMNodes( + new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + + // call getSeriesSchemasAndReadLockDevice + IDeviceID deviceID = idTable.getSeriesSchemas(insertRowPlan); + // assertEquals(3, manager.getAllTimeseriesCount(node.getPartialPath().concatNode("**"))); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // @Test + // public void testCreateAlignedTimeseriesAndInsertWithNotAlignedData() { + // MManager manager = IoTDB.metaManager; + // try { + // manager.setStorageGroup(new PartialPath("root.laptop")); + // manager.createAlignedTimeSeries( + // new PartialPath("root.laptop.d1.aligned_device"), + // Arrays.asList("s1", "s2", "s3"), + // Arrays.asList( + // TSDataType.valueOf("FLOAT"), + // TSDataType.valueOf("INT64"), + // TSDataType.valueOf("INT32")), + // Arrays.asList( + // TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")), + // Arrays.asList(compressionType, compressionType, compressionType)); + // + // // construct an insertRowPlan with mismatched data type + // long time = 1L; + // TSDataType[] dataTypes = + // new TSDataType[] {TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32}; + // + // String[] columns = new String[3]; + // columns[0] = "1.0"; + // columns[1] = "2"; + // columns[2] = "3"; + // + // InsertRowPlan insertRowPlan = + // new InsertRowPlan( + // new PartialPath("root.laptop.d1.aligned_device"), + // time, + // new String[] {"s1", "s2", "s3"}, + // dataTypes, + // columns, + // false); + // insertRowPlan.setMeasurementMNodes( + // new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + // + // // call getSeriesSchemasAndReadLockDevice + // manager.getSeriesSchemasAndReadLockDevice(insertRowPlan); + // } catch (Exception e) { + // e.printStackTrace(); + // Assert.assertEquals( + // "Timeseries under path [root.laptop.d1.aligned_device] is aligned , please + // setInsertPlan.isAligned() = true", + // e.getMessage()); + // } + // } + // + // @Test + // public void testCreateTimeseriesAndInsertWithMismatchDataType() { + // MManager manager = IoTDB.metaManager; + // try { + // manager.setStorageGroup(new PartialPath("root.laptop")); + // manager.createTimeseries( + // new PartialPath("root.laptop.d1.s0"), + // TSDataType.valueOf("INT32"), + // TSEncoding.valueOf("RLE"), + // compressionType, + // Collections.emptyMap()); + // + // // construct an insertRowPlan with mismatched data type + // long time = 1L; + // TSDataType[] dataTypes = new TSDataType[] {TSDataType.FLOAT}; + // + // String[] columns = new String[1]; + // columns[0] = 2.0 + ""; + // + // InsertRowPlan insertRowPlan = + // new InsertRowPlan( + // new PartialPath("root.laptop.d1"), time, new String[] {"s0"}, dataTypes, columns); + // insertRowPlan.setMeasurementMNodes( + // new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + // + // // call getSeriesSchemasAndReadLockDevice + // IMNode node = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan); + // assertEquals(1, manager.getAllTimeseriesCount(node.getPartialPath().concatNode("**"))); + // assertNull(insertRowPlan.getMeasurementMNodes()[0]); + // assertEquals(1, insertRowPlan.getFailedMeasurementNumber()); + // + // } catch (Exception e) { + // e.printStackTrace(); + // fail(e.getMessage()); + // } + // } + // + // @Test + // public void testCreateTimeseriesAndInsertWithAlignedData() { + // MManager manager = IoTDB.metaManager; + // try { + // manager.setStorageGroup(new PartialPath("root.laptop")); + // manager.createTimeseries( + // new PartialPath("root.laptop.d1.aligned_device.s1"), + // TSDataType.valueOf("INT32"), + // TSEncoding.valueOf("RLE"), + // compressionType, + // Collections.emptyMap()); + // manager.createTimeseries( + // new PartialPath("root.laptop.d1.aligned_device.s2"), + // TSDataType.valueOf("INT64"), + // TSEncoding.valueOf("RLE"), + // compressionType, + // Collections.emptyMap()); + // + // // construct an insertRowPlan with mismatched data type + // long time = 1L; + // TSDataType[] dataTypes = new TSDataType[] {TSDataType.INT32, TSDataType.INT64}; + // + // String[] columns = new String[2]; + // columns[0] = "1"; + // columns[1] = "2"; + // + // InsertRowPlan insertRowPlan = + // new InsertRowPlan( + // new PartialPath("root.laptop.d1.aligned_device"), + // time, + // new String[] {"s1", "s2"}, + // dataTypes, + // columns, + // true); + // insertRowPlan.setMeasurementMNodes( + // new IMeasurementMNode[insertRowPlan.getMeasurements().length]); + // + // // call getSeriesSchemasAndReadLockDevice + // manager.getSeriesSchemasAndReadLockDevice(insertRowPlan); + // } catch (Exception e) { + // e.printStackTrace(); + // Assert.assertEquals( + // "Timeseries under path [root.laptop.d1.aligned_device] is not aligned , please set + // InsertPlan.isAligned() = false", + // e.getMessage()); + // } + // } } diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java index 34f490b..e5c83f8 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/id_table/entry/SchemaEntryTest.java @@ -25,7 +25,8 @@ import static org.junit.Assert.assertEquals; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.junit.Test; public class SchemaEntryTest { @@ -53,19 +54,20 @@ public class SchemaEntryTest { assertEquals(schemaEntry.getFlushTime(), 100); // last cache - Object o1 = new Object(); - Object o2 = new Object(); - schemaEntry.updateLastCache(new Pair<>(100L, o1)); - assertEquals(schemaEntry.getLastValue(), o1); - assertEquals(schemaEntry.getLastTime(), 100L); + schemaEntry.updateCachedLast( + new TimeValuePair(100L, new TsPrimitiveType.TsLong(1L)), false, 0L); + assertEquals(new TsPrimitiveType.TsLong(1L), schemaEntry.getLastValue()); + assertEquals(100L, schemaEntry.getLastTime()); - schemaEntry.updateLastCache(new Pair<>(90L, o2)); - assertEquals(schemaEntry.getLastValue(), o1); - assertEquals(schemaEntry.getLastTime(), 100L); + schemaEntry.updateCachedLast( + new TimeValuePair(90L, new TsPrimitiveType.TsLong(2L)), false, 0L); + assertEquals(new TsPrimitiveType.TsLong(1L), schemaEntry.getLastValue()); + assertEquals(100L, schemaEntry.getLastTime()); - schemaEntry.updateLastCache(new Pair<>(110L, o2)); - assertEquals(schemaEntry.getLastValue(), o2); - assertEquals(schemaEntry.getLastTime(), 110L); + schemaEntry.updateCachedLast( + new TimeValuePair(110L, new TsPrimitiveType.TsLong(2L)), false, 0L); + assertEquals(new TsPrimitiveType.TsLong(2L), schemaEntry.getLastValue()); + assertEquals(110L, schemaEntry.getLastTime()); } } }
