This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch xkf_fix_idtable_compaction in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cfc121917f9204c4c87bc788fc10e8896936bf87 Author: 151250176 <[email protected]> AuthorDate: Mon Mar 14 15:51:55 2022 +0800 fix compaction bug --- .../integration/IoTDBCompactionITWithIDTable.java | 352 +++++++++++++++++++++ .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 40 ++- .../db/engine/compaction/CompactionUtils.java | 19 +- .../manage/CrossSpaceCompactionResource.java | 6 - .../inner/utils/InnerSpaceCompactionUtils.java | 8 +- .../apache/iotdb/db/metadata/idtable/IDTable.java | 10 + .../db/metadata/idtable/IDTableHashmapImpl.java | 28 ++ .../iotdb/db/metadata/idtable/IDTableManager.java | 21 ++ 8 files changed, 461 insertions(+), 23 deletions(-) diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionITWithIDTable.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionITWithIDTable.java new file mode 100644 index 0000000..90d1701 --- /dev/null +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionITWithIDTable.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.integration; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; +import org.apache.iotdb.integration.env.ConfigFactory; +import org.apache.iotdb.integration.env.EnvFactory; +import org.apache.iotdb.itbase.category.ClusterTest; +import org.apache.iotdb.itbase.category.LocalStandaloneTest; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@Category({LocalStandaloneTest.class, ClusterTest.class}) +public class IoTDBCompactionITWithIDTable { + + private static final Logger logger = LoggerFactory.getLogger(IoTDBCompactionIT.class); + private long prevPartitionInterval; + + private static boolean isEnableIDTable = false; + + private static String originalDeviceIDTransformationMethod = null; + + @Before + public void setUp() throws Exception { + prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(); + ConfigFactory.getConfig().setPartitionInterval(1); + isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable(); + originalDeviceIDTransformationMethod = + IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod(); + + ConfigFactory.getConfig().setEnableIDTable(true); + ConfigFactory.getConfig().setDeviceIDTransformationMethod("SHA256"); + + EnvFactory.getEnv().initBeforeTest(); + } + + @After + public void tearDown() throws Exception { + ConfigFactory.getConfig().setEnableIDTable(isEnableIDTable); + ConfigFactory.getConfig().setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod); + + EnvFactory.getEnv().cleanAfterTest(); + ConfigFactory.getConfig().setPartitionInterval(prevPartitionInterval); + } + + @Test + public void testOverlap() throws SQLException { + logger.info("test..."); + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("SET STORAGE GROUP TO root.mergeTest"); + try { + statement.execute("CREATE TIMESERIES root.mergeTest.s1 WITH DATATYPE=INT64,ENCODING=PLAIN"); + } catch (SQLException e) { + // ignore + } + + statement.execute( + String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 1, 1)); + statement.execute( + String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 2)); + statement.execute("FLUSH"); + statement.execute( + String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 5, 5)); + statement.execute( + String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 6, 6)); + statement.execute("FLUSH"); + statement.execute( + String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 3)); + statement.execute( + String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 3, 3)); + statement.execute("FLUSH"); + + try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) { + int cnt = 0; + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + long s1 = resultSet.getLong("root.mergeTest.s1"); + if (time == 2) { + assertEquals(3, s1); + } else { + assertEquals(time, s1); + } + cnt++; + } + assertEquals(5, cnt); + } + } + } + + @Test + public void test() throws SQLException { + logger.info("test..."); + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("SET STORAGE GROUP TO root.mergeTest"); + for (int i = 1; i <= 3; i++) { + try { + statement.execute( + "CREATE TIMESERIES root.mergeTest.s" + + i + + " WITH DATATYPE=INT64," + + "ENCODING=PLAIN"); + } catch (SQLException e) { + // ignore + } + } + + for (int i = 0; i < 10; i++) { + logger.info("Running the {} round merge", i); + for (int j = i * 10 + 1; j <= (i + 1) * 10; j++) { + statement.addBatch( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 1, j + 2, j + 3)); + } + statement.executeBatch(); + statement.execute("FLUSH"); + for (int j = i * 10 + 1; j <= (i + 1) * 10; j++) { + statement.addBatch( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 10, j + 20, j + 30)); + } + statement.executeBatch(); + statement.execute("FLUSH"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + int cnt; + try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) { + cnt = 0; + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + long s1 = resultSet.getLong("root.mergeTest.s1"); + long s2 = resultSet.getLong("root.mergeTest.s2"); + long s3 = resultSet.getLong("root.mergeTest.s3"); + assertEquals(time + 10, s1); + assertEquals(time + 20, s2); + assertEquals(time + 30, s3); + cnt++; + } + } + assertEquals((i + 1) * 10, cnt); + } + } + } + + @Test + public void testInvertedOrder() { + logger.info("testInvertedOrder..."); + // case: seq data and unseq data are written in reverted order + // e.g.: write 1. seq [10, 20), 2. seq [20, 30), 3. unseq [20, 30), 4. unseq [10, 20) + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("SET STORAGE GROUP TO root.mergeTest"); + for (int i = 1; i <= 3; i++) { + try { + statement.execute( + "CREATE TIMESERIES root.mergeTest.s" + + i + + " WITH DATATYPE=INT64," + + "ENCODING=PLAIN"); + } catch (SQLException e) { + // ignore + } + } + + for (int j = 10; j < 20; j++) { + statement.addBatch( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 1, j + 2, j + 3)); + } + statement.executeBatch(); + statement.execute("FLUSH"); + for (int j = 20; j < 30; j++) { + statement.addBatch( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 1, j + 2, j + 3)); + } + statement.executeBatch(); + statement.execute("FLUSH"); + + for (int j = 20; j < 30; j++) { + statement.addBatch( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 10, j + 20, j + 30)); + } + statement.executeBatch(); + statement.execute("FLUSH"); + for (int j = 10; j < 20; j++) { + statement.addBatch( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 10, j + 20, j + 30)); + } + statement.executeBatch(); + statement.execute("FLUSH"); + + statement.execute("MERGE"); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + int cnt; + try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) { + cnt = 0; + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + long s1 = resultSet.getLong("root.mergeTest.s1"); + long s2 = resultSet.getLong("root.mergeTest.s2"); + long s3 = resultSet.getLong("root.mergeTest.s3"); + assertEquals(cnt + 10, time); + assertEquals(time + 10, s1); + assertEquals(time + 20, s2); + assertEquals(time + 30, s3); + cnt++; + } + } + assertEquals(20, cnt); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCrossPartition() throws SQLException { + logger.info("testCrossPartition..."); + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("SET STORAGE GROUP TO root.mergeTest"); + for (int i = 1; i <= 3; i++) { + try { + statement.execute( + "CREATE TIMESERIES root.mergeTest.s" + + i + + " WITH DATATYPE=INT64," + + "ENCODING=PLAIN"); + } catch (SQLException e) { + // ignore + } + } + + // file in partition + for (int k = 0; k < 7; k++) { + // partition num + for (int i = 0; i < 10; i++) { + // sequence files + for (int j = i * 1000 + 300 + k * 100; j <= i * 1000 + 399 + k * 100; j++) { + statement.execute( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 1, j + 2, j + 3)); + } + statement.execute("FLUSH"); + // unsequence files + for (int j = i * 1000 + k * 100; j <= i * 1000 + 99 + k * 100; j++) { + statement.execute( + String.format( + "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)", + j, j + 10, j + 20, j + 30)); + } + statement.execute("FLUSH"); + } + } + + statement.execute("MERGE"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + + } + + long totalTime = 0; + while (CompactionTaskManager.currentTaskNum.get() > 0) { + // wait + try { + Thread.sleep(1000); + totalTime += 1000; + if (totalTime > 240_000) { + fail(); + break; + } + } catch (InterruptedException e) { + + } + } + int cnt; + try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) { + cnt = 0; + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + long s1 = resultSet.getLong("root.mergeTest.s1"); + long s2 = resultSet.getLong("root.mergeTest.s2"); + long s3 = resultSet.getLong("root.mergeTest.s3"); + assertEquals(cnt, time); + if (time % 1000 < 700) { + assertEquals(time + 10, s1); + assertEquals(time + 20, s2); + assertEquals(time + 30, s3); + } else { + assertEquals(time + 1, s1); + assertEquals(time + 2, s2); + assertEquals(time + 3, s3); + } + cnt++; + } + } + assertEquals(10000, cnt); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java index d4492c7..879c748 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java @@ -18,17 +18,6 @@ */ package org.apache.iotdb.db.conf; -import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; -import org.apache.iotdb.db.exception.ConfigurationException; -import org.apache.iotdb.tsfile.common.conf.TSFileConfig; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; -import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; -import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; - -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -39,6 +28,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; +import org.apache.iotdb.db.exception.ConfigurationException; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IoTDBConfigCheck { @@ -89,6 +87,12 @@ public class IoTDBConfigCheck { private static final String VIRTUAL_STORAGE_GROUP_NUM = "virtual_storage_group_num"; private static String virtualStorageGroupNum = String.valueOf(config.getVirtualStorageGroupNum()); + private static final String ENABLE_ID_TABLE = "enable_id_table"; + private static String enableIDTable = String.valueOf(config.isEnableIDTable()); + + private static final String ENABLE_ID_TABLE_LOG_FILE = "enable_id_table_log_file"; + private static String enableIdTableLogFile = String.valueOf(config.isEnableIDTableLogFile()); + private static final String TIME_ENCODER_KEY = "time_encoder"; private static String timeEncoderValue = String.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); @@ -149,6 +153,8 @@ public class IoTDBConfigCheck { systemProperties.put(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode); systemProperties.put(VIRTUAL_STORAGE_GROUP_NUM, virtualStorageGroupNum); systemProperties.put(TIME_ENCODER_KEY, timeEncoderValue); + systemProperties.put(ENABLE_ID_TABLE, enableIDTable); + systemProperties.put(ENABLE_ID_TABLE_LOG_FILE, enableIdTableLogFile); } /** @@ -339,6 +345,18 @@ public class IoTDBConfigCheck { if (!(properties.getProperty(TIME_ENCODER_KEY).equals(timeEncoderValue))) { throwException(TIME_ENCODER_KEY, timeEncoderValue); } + + if (!(properties.getProperty(TIME_ENCODER_KEY).equals(timeEncoderValue))) { + throwException(TIME_ENCODER_KEY, timeEncoderValue); + } + + if (!(properties.getProperty(ENABLE_ID_TABLE).equals(enableIDTable))) { + throwException(ENABLE_ID_TABLE, enableIDTable); + } + + if (!(properties.getProperty(ENABLE_ID_TABLE_LOG_FILE).equals(enableIdTableLogFile))) { + throwException(ENABLE_ID_TABLE_LOG_FILE, enableIdTableLogFile); + } } private void throwException(String parameter, Object badValue) throws ConfigurationException { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java index 906ead9..8324cdd 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.compaction; import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter; import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter; @@ -32,6 +33,7 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.metadata.idtable.IDTableManager; import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.path.PartialPath; @@ -124,10 +126,13 @@ public class CompactionUtils { Set<String> allMeasurements = alignedMeasurementIterator.getAllMeasurements(); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); for (String measurement : allMeasurements) { - // TODO: use IDTable try { - measurementSchemas.add( - IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement))); + if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) { + measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement)); + } else { + measurementSchemas.add( + IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement))); + } } catch (PathNotExistException e) { logger.info("A deleted path is skipped: {}", e.getMessage()); } @@ -173,8 +178,12 @@ public class CompactionUtils { for (String measurement : allMeasurements) { List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); try { - measurementSchemas.add( - IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement))); + if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) { + measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement)); + } else { + measurementSchemas.add( + IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement))); + } } catch (PathNotExistException e) { logger.info("A deleted path is skipped: {}", e.getMessage()); continue; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java index a19fef7..7ced2fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java @@ -21,9 +21,7 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite.manage; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.path.PartialPath; -import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; @@ -111,10 +109,6 @@ public class CrossSpaceCompactionResource { chunkWriterCache.clear(); } - public IMeasurementSchema getSchema(PartialPath path) throws MetadataException { - return IoTDB.metaManager.getSeriesSchema(path); - } - /** * Construct the a new or get an existing TsFileSequenceReader of a TsFile. * diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java index 91f5a9b..0fef121 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.metadata.idtable.IDTableManager; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.service.IoTDB; @@ -120,7 +121,12 @@ public class InnerSpaceCompactionUtils { LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList = seriesIterator.getMetadataListForCurrentSeries(); try { - measurementSchema = IoTDB.metaManager.getSeriesSchema(p); + if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) { + measurementSchema = + IDTableManager.getInstance().getSeriesSchema(device, p.getMeasurement()); + } else { + measurementSchema = IoTDB.metaManager.getSeriesSchema(p); + } } catch (PathNotExistException e) { logger.info("A deleted path is skipped: {}", e.getMessage()); continue; diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java index 5ab0d47..4c32feb 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +132,15 @@ public interface IDTable { public DeviceEntry getDeviceEntry(String deviceName); /** + * get schema from device and measurements + * + * @param deviceName device name of the time series + * @param measurementName measurement name of the time series + * @return schema entry of the timeseries + */ + public IMeasurementSchema getSeriesSchema(String deviceName, String measurementName); + + /** * get all device entries * * @return all device entries diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java index fb21776..fb7732c 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -275,6 +276,33 @@ public class IDTableHashmapImpl implements IDTable { return idTables[slot].get(deviceID); } + /** + * get schema from device and measurements + * + * @param deviceName device name of the time series + * @param measurementName measurement name of the time series + * @return schema entry of the timeseries + */ + @Override + public IMeasurementSchema getSeriesSchema(String deviceName, String measurementName) { + DeviceEntry deviceEntry = getDeviceEntry(deviceName); + if (deviceEntry == null) { + return null; + } + + SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName); + if (schemaEntry == null) { + return null; + } + + // build measurement schema + return new MeasurementSchema( + measurementName, + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType()); + } + @Override public List<DeviceEntry> getAllDeviceEntry() { List<DeviceEntry> res = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java index 02026b9..c1ff2b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java @@ -21,10 +21,12 @@ package org.apache.iotdb.db.metadata.idtable; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.utils.FilePathUtils; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,6 +96,25 @@ public class IDTableManager { return null; } + /** + * get schema from device and measurements + * + * @param deviceName device name of the time series + * @param measurementName measurement name of the time series + * @return schema entry of the time series + */ + public synchronized IMeasurementSchema getSeriesSchema(String deviceName, String measurementName) + throws MetadataException { + for (IDTable idTable : idTableMap.values()) { + IMeasurementSchema measurementSchema = idTable.getSeriesSchema(deviceName, measurementName); + if (measurementSchema != null) { + return measurementSchema; + } + } + + throw new PathNotExistException(new PartialPath(deviceName, measurementName).toString()); + } + /** clear id table map */ public void clear() throws IOException { for (IDTable idTable : idTableMap.values()) {
