This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch IMeasurementSchema in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2113864fb19a78ba68c6a7d0b6522d41a4682621 Author: samperson1997 <[email protected]> AuthorDate: Thu Mar 11 16:33:31 2021 +0800 [IOTDB-1203] Create interface of IMeasurementSchema --- .../java/org/apache/iotdb/cluster/ClientMain.java | 5 +- .../cluster/client/sync/SyncClientAdaptor.java | 6 +-- .../apache/iotdb/cluster/metadata/CMManager.java | 14 ++--- .../apache/iotdb/cluster/metadata/MetaPuller.java | 15 +++--- .../iotdb/cluster/query/LocalQueryExecutor.java | 6 +-- .../caller/PullMeasurementSchemaHandler.java | 7 +-- .../cluster/client/sync/SyncClientAdaptorTest.java | 5 +- .../org/apache/iotdb/cluster/common/IoTDBTest.java | 4 +- .../org/apache/iotdb/cluster/common/TestUtils.java | 9 ++-- .../caller/PullMeasurementSchemaHandlerTest.java | 10 ++-- .../cluster/server/member/DataGroupMemberTest.java | 3 +- .../cluster/server/member/MetaGroupMemberTest.java | 10 ++-- .../iotdb/db/engine/flush/MemTableFlushTask.java | 8 +-- .../iotdb/db/engine/flush/NotifyFlushMemTable.java | 4 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 8 +-- .../apache/iotdb/db/engine/memtable/IMemTable.java | 4 +- .../db/engine/memtable/IWritableMemChunk.java | 4 +- .../db/engine/memtable/PrimitiveMemTable.java | 4 +- .../iotdb/db/engine/memtable/WritableMemChunk.java | 8 +-- .../db/engine/merge/manage/MergeResource.java | 12 ++--- .../db/engine/merge/task/MergeMultiChunkTask.java | 6 +-- .../iotdb/db/engine/merge/task/MergeTask.java | 4 +- .../engine/storagegroup/StorageGroupProcessor.java | 4 +- .../apache/iotdb/db/metadata/MLogTxtWriter.java | 4 +- .../org/apache/iotdb/db/metadata/MManager.java | 13 ++--- .../java/org/apache/iotdb/db/metadata/MTree.java | 8 +-- .../apache/iotdb/db/metadata/MeasurementMeta.java | 14 ++--- .../iotdb/db/metadata/mnode/MeasurementMNode.java | 11 ++-- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 8 +-- .../db/qp/physical/sys/MeasurementMNodePlan.java | 9 ++-- .../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 29 +++++----- .../org/apache/iotdb/db/utils/SchemaUtils.java | 3 +- .../query/reader/series/SeriesReaderTestUtil.java | 3 +- .../db/writelog/recover/SeqTsFileRecoverTest.java | 3 +- .../java/org/apache/iotdb/session/Session.java | 6 +-- .../iotdb/spark/tsfile/NarrowConverter.scala | 4 +- .../apache/iotdb/spark/tsfile/WideConverter.scala | 5 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 13 ++--- .../apache/iotdb/tsfile/write/TsFileWriter.java | 13 ++--- .../tsfile/write/chunk/ChunkGroupWriterImpl.java | 3 +- .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 8 +-- .../tsfile/write/chunk/IChunkGroupWriter.java | 4 +- .../apache/iotdb/tsfile/write/page/PageWriter.java | 6 +-- .../tsfile/write/schema/IMeasurementSchema.java | 62 ++++++++++++++++++++++ .../tsfile/write/schema/MeasurementSchema.java | 38 ++++++++++++- .../apache/iotdb/tsfile/write/schema/Schema.java | 25 ++++----- .../write/writer/RestorableTsFileIOWriter.java | 8 +-- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 4 +- .../org/apache/iotdb/tsfile/utils/RecordUtils.java | 4 +- .../tsfile/write/DefaultDeviceTemplateTest.java | 3 +- .../write/schema/converter/SchemaBuilderTest.java | 17 +++--- 51 files changed, 302 insertions(+), 186 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java index 7543f4a..cf30381 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java @@ -41,6 +41,7 @@ import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.session.SessionDataSet; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.commons.cli.CommandLine; @@ -115,7 +116,7 @@ public class ClientMain { private static final TSDataType[] DATA_TYPES = new TSDataType[] {TSDataType.DOUBLE}; - private static List<MeasurementSchema> schemas; + private static List<IMeasurementSchema> schemas; private static final String[] DATA_QUERIES = new String[] { @@ -364,7 +365,7 @@ public class ClientMain { private static void registerTimeseries(long sessionId, Client client) throws TException { TSCreateTimeseriesReq req = new TSCreateTimeseriesReq(); req.setSessionId(sessionId); - for (MeasurementSchema schema : schemas) { + for (IMeasurementSchema schema : schemas) { req.setDataType(schema.getType().ordinal()); req.setEncoding(schema.getEncodingType().ordinal()); req.setCompressor(schema.getCompressor().ordinal()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java index d518c62..2e43741 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java @@ -61,7 +61,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.apache.thrift.TException; @@ -268,10 +268,10 @@ public class SyncClientAdaptor { return response.get(); } - public static List<MeasurementSchema> pullMeasurementSchema( + public static List<IMeasurementSchema> pullMeasurementSchema( AsyncDataClient client, PullSchemaRequest pullSchemaRequest) throws TException, InterruptedException { - AtomicReference<List<MeasurementSchema>> measurementSchemas = new AtomicReference<>(); + AtomicReference<List<IMeasurementSchema>> measurementSchemas = new AtomicReference<>(); client.pullMeasurementSchema( pullSchemaRequest, diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index 39e6e8f..223e383 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -76,7 +76,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.apache.thrift.TException; @@ -189,10 +189,10 @@ public class CMManager extends MManager { seriesType = super.getSeriesType(path); } catch (PathNotExistException e) { // pull from remote node - List<MeasurementSchema> schemas = + List<IMeasurementSchema> schemas = metaPuller.pullMeasurementSchemas(Collections.singletonList(path)); if (!schemas.isEmpty()) { - MeasurementSchema measurementSchema = schemas.get(0); + IMeasurementSchema measurementSchema = schemas.get(0); MeasurementMNode measurementMNode = new MeasurementMNode( null, measurementSchema.getMeasurementId(), measurementSchema, null); @@ -269,8 +269,8 @@ public class CMManager extends MManager { for (String s : measurementList) { schemasToPull.add(deviceId.concatNode(s)); } - List<MeasurementSchema> schemas = metaPuller.pullMeasurementSchemas(schemasToPull); - for (MeasurementSchema schema : schemas) { + List<IMeasurementSchema> schemas = metaPuller.pullMeasurementSchemas(schemasToPull); + for (IMeasurementSchema schema : schemas) { // TODO-Cluster: also pull alias? MeasurementMNode measurementMNode = new MeasurementMNode(null, schema.getMeasurementId(), schema, null); @@ -337,10 +337,10 @@ public class CMManager extends MManager { } @Override - public MeasurementSchema getSeriesSchema(PartialPath device, String measurement) + public IMeasurementSchema getSeriesSchema(PartialPath device, String measurement) throws MetadataException { try { - MeasurementSchema measurementSchema = super.getSeriesSchema(device, measurement); + IMeasurementSchema measurementSchema = super.getSeriesSchema(device, measurement); if (measurementSchema != null) { return measurementSchema; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java index 13ece7b..57a6286 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java @@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.thrift.TException; @@ -77,7 +78,7 @@ public class MetaPuller { * Pull the all timeseries schemas of given prefixPaths from remote nodes. All prefixPaths must * contain the storage group. */ - List<MeasurementSchema> pullMeasurementSchemas(List<PartialPath> prefixPaths) + List<IMeasurementSchema> pullMeasurementSchemas(List<PartialPath> prefixPaths) throws MetadataException { logger.debug("{}: Pulling timeseries schemas of {}", metaGroupMember.getName(), prefixPaths); // split the paths by the data groups that will hold them @@ -88,7 +89,7 @@ public class MetaPuller { partitionGroupPathMap.computeIfAbsent(partitionGroup, g -> new ArrayList<>()).add(prefixPath); } - List<MeasurementSchema> schemas = new ArrayList<>(); + List<IMeasurementSchema> schemas = new ArrayList<>(); // pull timeseries schema from every group involved if (logger.isDebugEnabled()) { logger.debug( @@ -127,7 +128,7 @@ public class MetaPuller { private void pullMeasurementSchemas( PartitionGroup partitionGroup, List<PartialPath> prefixPaths, - List<MeasurementSchema> results) { + List<IMeasurementSchema> results) { if (partitionGroup.contains(metaGroupMember.getThisNode())) { // the node is in the target group, synchronize with leader should be enough try { @@ -166,7 +167,7 @@ public class MetaPuller { } private boolean pullMeasurementSchemas( - Node node, PullSchemaRequest request, List<MeasurementSchema> results) { + Node node, PullSchemaRequest request, List<IMeasurementSchema> results) { if (logger.isDebugEnabled()) { logger.debug( "{}: Pulling timeseries schemas of {} and other {} paths from {}", @@ -176,7 +177,7 @@ public class MetaPuller { node); } - List<MeasurementSchema> schemas = null; + List<IMeasurementSchema> schemas = null; try { schemas = pullMeasurementSchemas(node, request); } catch (IOException | TException e) { @@ -215,9 +216,9 @@ public class MetaPuller { return false; } - private List<MeasurementSchema> pullMeasurementSchemas(Node node, PullSchemaRequest request) + private List<IMeasurementSchema> pullMeasurementSchemas(Node node, PullSchemaRequest request) throws TException, InterruptedException, IOException { - List<MeasurementSchema> schemas; + List<IMeasurementSchema> schemas; if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { AsyncDataClient client = metaGroupMember diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java index 028b3fc..e2767d9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java @@ -67,7 +67,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.slf4j.Logger; @@ -302,7 +302,7 @@ public class LocalQueryExecutor { // collect local timeseries schemas and send to the requester // the measurements in them are the full paths. List<String> prefixPaths = request.getPrefixPaths(); - List<MeasurementSchema> measurementSchemas = new ArrayList<>(); + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); for (String prefixPath : prefixPaths) { getCMManager().collectSeries(new PartialPath(prefixPath), measurementSchemas); } @@ -321,7 +321,7 @@ public class LocalQueryExecutor { DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); try { dataOutputStream.writeInt(measurementSchemas.size()); - for (MeasurementSchema timeseriesSchema : measurementSchemas) { + for (IMeasurementSchema timeseriesSchema : measurementSchemas) { timeseriesSchema.serializeTo(dataOutputStream); } } catch (IOException ignored) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandler.java index aec74db..6eceac0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandler.java @@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.handlers.caller; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.thrift.async.AsyncMethodCallback; @@ -38,12 +39,12 @@ public class PullMeasurementSchemaHandler implements AsyncMethodCallback<PullSch private Node owner; private List<String> prefixPaths; - private AtomicReference<List<MeasurementSchema>> timeseriesSchemas; + private AtomicReference<List<IMeasurementSchema>> timeseriesSchemas; public PullMeasurementSchemaHandler( Node owner, List<String> prefixPaths, - AtomicReference<List<MeasurementSchema>> timeseriesSchemas) { + AtomicReference<List<IMeasurementSchema>> timeseriesSchemas) { this.owner = owner; this.prefixPaths = prefixPaths; this.timeseriesSchemas = timeseriesSchemas; @@ -53,7 +54,7 @@ public class PullMeasurementSchemaHandler implements AsyncMethodCallback<PullSch public void onComplete(PullSchemaResp response) { ByteBuffer buffer = response.schemaBytes; int size = buffer.getInt(); - List<MeasurementSchema> schemas = new ArrayList<>(size); + List<IMeasurementSchema> schemas = new ArrayList<>(size); for (int i = 0; i < size; i++) { schemas.add(MeasurementSchema.deserializeFrom(buffer)); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java index 608c8a0..52c3198 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; @@ -88,7 +89,7 @@ public class SyncClientAdaptorTest { private ByteBuffer peekNextNotNullValueResult; private Map<Integer, SimpleSnapshot> snapshotMap; private ByteBuffer lastResult; - private List<MeasurementSchema> measurementSchemas; + private List<IMeasurementSchema> measurementSchemas; private List<TimeseriesSchema> timeseriesSchemas; private List<String> paths; @@ -196,7 +197,7 @@ public class SyncClientAdaptorTest { PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { ByteBuffer byteBuffer = ByteBuffer.allocate(4096); byteBuffer.putInt(measurementSchemas.size()); - for (MeasurementSchema schema : measurementSchemas) { + for (IMeasurementSchema schema : measurementSchemas) { schema.serializeTo(byteBuffer); } byteBuffer.flip(); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java index e7b1b7b..eafe889 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java @@ -43,7 +43,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.junit.After; import org.junit.Before; @@ -133,7 +133,7 @@ public abstract class IoTDBTest { private void createTimeSeries(int sgNum, int seriesNum) { try { - MeasurementSchema schema = TestUtils.getTestMeasurementSchema(seriesNum); + IMeasurementSchema schema = TestUtils.getTestMeasurementSchema(seriesNum); planExecutor.processNonQuery( new CreateTimeSeriesPlan( new PartialPath( diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java index 53becb4..3bd5a4e 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java @@ -53,6 +53,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; @@ -186,7 +187,7 @@ public class TestUtils { return "s" + seriesNum; } - public static MeasurementSchema getTestMeasurementSchema(int seriesNum) { + public static IMeasurementSchema getTestMeasurementSchema(int seriesNum) { TSDataType dataType = TSDataType.DOUBLE; TSEncoding encoding = IoTDBDescriptor.getInstance().getConfig().getDefaultDoubleEncoding(); return new MeasurementSchema( @@ -200,7 +201,7 @@ public class TestUtils { public static MeasurementMNode getTestMeasurementMNode(int seriesNum) { TSDataType dataType = TSDataType.DOUBLE; TSEncoding encoding = IoTDBDescriptor.getInstance().getConfig().getDefaultDoubleEncoding(); - MeasurementSchema measurementSchema = + IMeasurementSchema measurementSchema = new MeasurementSchema( TestUtils.getTestMeasurement(seriesNum), dataType, @@ -386,7 +387,7 @@ public class TestUtils { file.getParentFile().mkdirs(); try (TsFileWriter writer = new TsFileWriter(file)) { for (int k = 0; k < seriesNum; k++) { - MeasurementSchema schema = getTestMeasurementSchema(k); + IMeasurementSchema schema = getTestMeasurementSchema(k); writer.registerTimeseries(new Path(getTestSg(sgNum), schema.getMeasurementId()), schema); } @@ -394,7 +395,7 @@ public class TestUtils { long timestamp = i * ptNum + j; TSRecord record = new TSRecord(timestamp, getTestSg(sgNum)); for (int k = 0; k < seriesNum; k++) { - MeasurementSchema schema = getTestMeasurementSchema(k); + IMeasurementSchema schema = getTestMeasurementSchema(k); DataPoint dataPoint = DataPoint.getDataPoint( schema.getType(), schema.getMeasurementId(), String.valueOf(k)); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandlerTest.java index bd23a0d..45c7042 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandlerTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/PullMeasurementSchemaHandlerTest.java @@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.common.TestException; import org.apache.iotdb.cluster.common.TestUtils; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.junit.Test; @@ -44,8 +44,8 @@ public class PullMeasurementSchemaHandlerTest { public void testComplete() throws InterruptedException { Node owner = TestUtils.getNode(1); String prefixPath = "root"; - AtomicReference<List<MeasurementSchema>> result = new AtomicReference<>(); - List<MeasurementSchema> measurementSchemas = new ArrayList<>(); + AtomicReference<List<IMeasurementSchema>> result = new AtomicReference<>(); + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); for (int i = 0; i < 10; i++) { measurementSchemas.add(TestUtils.getTestMeasurementSchema(i)); } @@ -59,7 +59,7 @@ public class PullMeasurementSchemaHandlerTest { DataOutputStream dataOutputStream = new DataOutputStream(outputStream); try { dataOutputStream.writeInt(measurementSchemas.size()); - for (MeasurementSchema measurementSchema : measurementSchemas) { + for (IMeasurementSchema measurementSchema : measurementSchemas) { measurementSchema.serializeTo(dataOutputStream); } } catch (IOException e) { @@ -79,7 +79,7 @@ public class PullMeasurementSchemaHandlerTest { public void testError() throws InterruptedException { Node owner = TestUtils.getNode(1); String prefixPath = "root"; - AtomicReference<List<MeasurementSchema>> result = new AtomicReference<>(); + AtomicReference<List<IMeasurementSchema>> result = new AtomicReference<>(); PullMeasurementSchemaHandler handler = new PullMeasurementSchemaHandler(owner, Collections.singletonList(prefixPath), result); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index 84d069a..cd3d8df 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@ -83,7 +83,6 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.apache.thrift.async.AsyncMethodCallback; @@ -612,7 +611,7 @@ public class DataGroupMemberTest extends MemberTest { PullSchemaRequest request = new PullSchemaRequest(); request.setPrefixPaths(Collections.singletonList(TestUtils.getTestSg(0))); - AtomicReference<List<MeasurementSchema>> result = new AtomicReference<>(); + AtomicReference<List<IMeasurementSchema>> result = new AtomicReference<>(); PullMeasurementSchemaHandler handler = new PullMeasurementSchemaHandler(TestUtils.getNode(1), request.getPrefixPaths(), result); new DataAsyncService(dataGroupMember).pullMeasurementSchema(request, handler); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 08914d1..010ebb6 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -98,7 +98,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.apache.thrift.async.AsyncMethodCallback; @@ -277,7 +277,7 @@ public class MetaGroupMemberTest extends MemberTest { } private PullSchemaResp mockedPullTimeSeriesSchema(PullSchemaRequest request) { - List<MeasurementSchema> schemas = new ArrayList<>(); + List<IMeasurementSchema> schemas = new ArrayList<>(); List<String> prefixPaths = request.getPrefixPaths(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); @@ -286,7 +286,7 @@ public class MetaGroupMemberTest extends MemberTest { if (!prefixPath.equals(TestUtils.getTestSeries(10, 0))) { IoTDB.metaManager.collectSeries(new PartialPath(prefixPath), schemas); dataOutputStream.writeInt(schemas.size()); - for (MeasurementSchema schema : schemas) { + for (IMeasurementSchema schema : schemas) { schema.serializeTo(dataOutputStream); } } else { @@ -856,7 +856,7 @@ public class MetaGroupMemberTest extends MemberTest { insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]); for (int i = 0; i < 10; i++) { insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(i))); - MeasurementSchema schema = TestUtils.getTestMeasurementSchema(0); + IMeasurementSchema schema = TestUtils.getTestMeasurementSchema(0); try { IoTDB.metaManager.createTimeseries( new PartialPath(schema.getMeasurementId()), @@ -912,7 +912,7 @@ public class MetaGroupMemberTest extends MemberTest { for (int i = 0; i < 10; i++) { insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(i))); - MeasurementSchema schema = TestUtils.getTestMeasurementSchema(0); + IMeasurementSchema schema = TestUtils.getTestMeasurementSchema(0); try { IoTDB.metaManager.createTimeseries( new PartialPath(schema.getMeasurementId()), diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index 9397af1..5366f43 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; @@ -110,7 +110,7 @@ public class MemTableFlushTask { for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) { long startTime = System.currentTimeMillis(); IWritableMemChunk series = iWritableMemChunkEntry.getValue(); - MeasurementSchema desc = series.getSchema(); + IMeasurementSchema desc = series.getSchema(); TVList tvList = series.getSortedTVListForFlush(); sortTime += System.currentTimeMillis() - startTime; encodingTaskQueue.put(new Pair<>(tvList, desc)); @@ -233,8 +233,8 @@ public class MemTableFlushTask { break; } else { long starTime = System.currentTimeMillis(); - Pair<TVList, MeasurementSchema> encodingMessage = - (Pair<TVList, MeasurementSchema>) task; + Pair<TVList, IMeasurementSchema> encodingMessage = + (Pair<TVList, IMeasurementSchema>) task; IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right); writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType()); seriesWriter.sealCurrentPage(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java index 5ba50d0..4aeb66c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.flush; import org.apache.iotdb.db.engine.memtable.AbstractMemTable; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; /** * Only used in sync flush and async close to start a flush task This memtable is not managed by @@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; public class NotifyFlushMemTable extends AbstractMemTable { @Override - protected IWritableMemChunk genMemSeries(MeasurementSchema schema) { + protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 21c9bf2..3538aa1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.TimeRange; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; import java.util.HashMap; @@ -93,7 +93,7 @@ public abstract class AbstractMemTable implements IMemTable { } private IWritableMemChunk createIfNotExistAndGet( - String deviceId, String measurement, MeasurementSchema schema) { + String deviceId, String measurement, IMeasurementSchema schema) { Map<String, IWritableMemChunk> memSeries = memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>()); @@ -106,7 +106,7 @@ public abstract class AbstractMemTable implements IMemTable { }); } - protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema); + protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema); @Override public void insert(InsertRowPlan insertRowPlan) { @@ -157,7 +157,7 @@ public abstract class AbstractMemTable implements IMemTable { public void write( String deviceId, String measurement, - MeasurementSchema schema, + IMeasurementSchema schema, long insertTime, Object objectValue) { IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, schema); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index ce412a2..b3fb16a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -28,7 +28,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.TimeRange; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; import java.util.List; @@ -49,7 +49,7 @@ public interface IMemTable { void write( String deviceId, String measurement, - MeasurementSchema schema, + IMeasurementSchema schema, long insertTime, Object objectValue); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index bdb4bbc..622d864 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.memtable; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; public interface IWritableMemChunk { @@ -56,7 +56,7 @@ public interface IWritableMemChunk { long count(); - MeasurementSchema getSchema(); + IMeasurementSchema getSchema(); /** * served for query requests. diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java index 254d722..30d1a49 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.engine.memtable; import org.apache.iotdb.db.rescon.TVListAllocator; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import java.util.HashMap; import java.util.Map; @@ -38,7 +38,7 @@ public class PrimitiveMemTable extends AbstractMemTable { } @Override - protected IWritableMemChunk genMemSeries(MeasurementSchema schema) { + protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) { return new WritableMemChunk(schema, TVListAllocator.getInstance().allocate(schema.getType())); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 4536896..550c173 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -22,14 +22,14 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; public class WritableMemChunk implements IWritableMemChunk { - private MeasurementSchema schema; + private IMeasurementSchema schema; private TVList list; - public WritableMemChunk(MeasurementSchema schema, TVList list) { + public WritableMemChunk(IMeasurementSchema schema, TVList list) { this.schema = schema; this.list = list; } @@ -188,7 +188,7 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public MeasurementSchema getSchema() { + public IMeasurementSchema getSchema() { return schema; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java index bd1ea17..9853864 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java @@ -32,7 +32,7 @@ import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import java.io.IOException; @@ -61,9 +61,9 @@ public class MergeResource { private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>(); private Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>(); private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>(); - private Map<PartialPath, MeasurementSchema> measurementSchemaMap = + private Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>(); // is this too waste? - private Map<MeasurementSchema, IChunkWriter> chunkWriterCache = new ConcurrentHashMap<>(); + private Map<IMeasurementSchema, IChunkWriter> chunkWriterCache = new ConcurrentHashMap<>(); private long timeLowerBound = Long.MIN_VALUE; @@ -103,7 +103,7 @@ public class MergeResource { chunkWriterCache.clear(); } - public MeasurementSchema getSchema(PartialPath path) { + public IMeasurementSchema getSchema(PartialPath path) { return measurementSchemaMap.get(path); } @@ -171,7 +171,7 @@ public class MergeResource { * Construct the a new or get an existing ChunkWriter of a measurement. Different timeseries of * the same measurement and data type shares the same instance. */ - public IChunkWriter getChunkWriter(MeasurementSchema measurementSchema) { + public IChunkWriter getChunkWriter(IMeasurementSchema measurementSchema) { return chunkWriterCache.computeIfAbsent(measurementSchema, ChunkWriterImpl::new); } @@ -256,7 +256,7 @@ public class MergeResource { this.cacheDeviceMeta = cacheDeviceMeta; } - public void setMeasurementSchemaMap(Map<PartialPath, MeasurementSchema> measurementSchemaMap) { + public void setMeasurementSchemaMap(Map<PartialPath, IMeasurementSchema> measurementSchemaMap) { this.measurementSchemaMap = measurementSchemaMap; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java index 70f24d5..0dcf205 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java @@ -40,7 +40,7 @@ import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -297,7 +297,7 @@ public class MergeMultiChunkTask { RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(currTsFile); for (PartialPath path : currMergingPaths) { - MeasurementSchema schema = resource.getSchema(path); + IMeasurementSchema schema = resource.getSchema(path); mergeFileWriter.addSchema(path, schema); } // merge unseq data with seq data in this file or small chunks in this file into a larger chunk @@ -619,7 +619,7 @@ public class MergeMultiChunkTask { while (!chunkIdxHeap.isEmpty()) { int pathIdx = chunkIdxHeap.poll(); PartialPath path = currMergingPaths.get(pathIdx); - MeasurementSchema measurementSchema = resource.getSchema(path); + IMeasurementSchema measurementSchema = resource.getSchema(path); IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema); if (Thread.interrupted()) { Thread.currentThread().interrupt(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java index ec96335..cb98e41 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java @@ -29,7 +29,7 @@ import org.apache.iotdb.db.metadata.mnode.MNode; import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.MergeUtils; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,7 +146,7 @@ public class MergeTask implements Callable<Void> { mergeLogger.logFiles(resource); Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName)); - Map<PartialPath, MeasurementSchema> measurementSchemaMap = new HashMap<>(); + Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>(); List<PartialPath> unmergedSeries = new ArrayList<>(); for (PartialPath device : devices) { MNode deviceNode = IoTDB.metaManager.getNodeByPath(device); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index b5b76df..1f9d80e 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -75,7 +75,7 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.apache.commons.io.FileUtils; @@ -1609,7 +1609,7 @@ public class StorageGroupProcessor { (timeFilter == null ? "null" : timeFilter)); } - MeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId); + IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); long timeLowerBound = diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java index ebfa2c6..853436c 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java @@ -24,7 +24,7 @@ import org.apache.iotdb.db.qp.physical.sys.MNodePlan; import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan; import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,7 +228,7 @@ public class MLogTxtWriter implements AutoCloseable { if (plan.getAlias() != null) { s.append(plan.getAlias()); } - MeasurementSchema schema = plan.getSchema(); + IMeasurementSchema schema = plan.getSchema(); s.append(",").append(schema.getType().ordinal()).append(","); s.append(schema.getEncodingType().ordinal()).append(","); s.append(schema.getCompressor().ordinal()).append(","); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index c6bcb78..f6bb7ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -69,6 +69,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; @@ -908,7 +909,7 @@ public class MManager { try { Pair<Map<String, String>, Map<String, String>> tagAndAttributePair = tagLogFile.read(config.getTagAttributeTotalSize(), leaf.getOffset()); - MeasurementSchema measurementSchema = leaf.getSchema(); + IMeasurementSchema measurementSchema = leaf.getSchema(); res.add( new ShowTimeSeriesResult( leaf.getFullPath(), @@ -996,7 +997,7 @@ public class MManager { return res; } - public MeasurementSchema getSeriesSchema(PartialPath device, String measurement) + public IMeasurementSchema getSeriesSchema(PartialPath device, String measurement) throws MetadataException { MNode node = mtree.getNodeByPath(device); MNode leaf = node.getChild(measurement); @@ -1654,7 +1655,7 @@ public class MManager { while (!nodeDeque.isEmpty()) { MNode node = nodeDeque.removeFirst(); if (node instanceof MeasurementMNode) { - MeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema(); + IMeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema(); timeseriesSchemas.add( new TimeseriesSchema( node.getFullPath(), @@ -1673,13 +1674,13 @@ public class MManager { } public void collectMeasurementSchema( - MNode startingNode, Collection<MeasurementSchema> measurementSchemas) { + MNode startingNode, Collection<IMeasurementSchema> measurementSchemas) { Deque<MNode> nodeDeque = new ArrayDeque<>(); nodeDeque.addLast(startingNode); while (!nodeDeque.isEmpty()) { MNode node = nodeDeque.removeFirst(); if (node instanceof MeasurementMNode) { - MeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema(); + IMeasurementSchema nodeSchema = ((MeasurementMNode) node).getSchema(); measurementSchemas.add( new MeasurementSchema( node.getName(), @@ -1693,7 +1694,7 @@ public class MManager { } /** Collect the timeseries schemas under "startingPath". */ - public void collectSeries(PartialPath startingPath, List<MeasurementSchema> measurementSchemas) { + public void collectSeries(PartialPath startingPath, List<IMeasurementSchema> measurementSchemas) { MNode mNode; try { mNode = getNodeByPath(startingPath); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 39a5088..762e2a8 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -41,7 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -496,7 +496,7 @@ public class MTree implements Serializable { /** * Get measurement schema for a given path. Path must be a complete Path from root to leaf node. */ - MeasurementSchema getSchema(PartialPath path) throws MetadataException { + IMeasurementSchema getSchema(PartialPath path) throws MetadataException { MeasurementMNode node = (MeasurementMNode) getNodeByPath(path); return node.getSchema(); } @@ -1082,7 +1082,7 @@ public class MTree implements Serializable { PartialPath nodePath = node.getPartialPath(); String[] tsRow = new String[7]; tsRow[0] = ((MeasurementMNode) node).getAlias(); - MeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema(); + IMeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema(); tsRow[1] = getStorageGroupPath(nodePath).getFullPath(); tsRow[2] = measurementSchema.getType().toString(); tsRow[3] = measurementSchema.getEncodingType().toString(); @@ -1193,7 +1193,7 @@ public class MTree implements Serializable { * <p>e.g., MTree has [root.sg1.d1.s1, root.sg1.d1.s2, root.sg1.d2.s1] given path = root.sg1.d1 * return [s1, s2] * - * @param partial Path + * @param path Path * @return All child nodes' seriesPath(s) of given seriesPath. */ Set<String> getChildNodeInNextLevel(PartialPath path) throws MetadataException { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MeasurementMeta.java b/server/src/main/java/org/apache/iotdb/db/metadata/MeasurementMeta.java index 7963c7b..59a5068 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MeasurementMeta.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MeasurementMeta.java @@ -19,34 +19,34 @@ package org.apache.iotdb.db.metadata; import org.apache.iotdb.tsfile.read.TimeValuePair; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; public class MeasurementMeta { - private MeasurementSchema measurementSchema = null; + private IMeasurementSchema measurementSchema; private String alias = null; // TODO get schema by alias private TimeValuePair timeValuePair = null; public MeasurementMeta( - MeasurementSchema measurementSchema, String alias, TimeValuePair timeValuePair) { + IMeasurementSchema measurementSchema, String alias, TimeValuePair timeValuePair) { this.measurementSchema = measurementSchema; this.alias = alias; this.timeValuePair = timeValuePair; } - public MeasurementMeta(MeasurementSchema measurementSchema, String alias) { + public MeasurementMeta(IMeasurementSchema measurementSchema, String alias) { this.measurementSchema = measurementSchema; this.alias = alias; } - public MeasurementMeta(MeasurementSchema measurementSchema) { + public MeasurementMeta(IMeasurementSchema measurementSchema) { this.measurementSchema = measurementSchema; } - public MeasurementSchema getMeasurementSchema() { + public IMeasurementSchema getMeasurementSchema() { return measurementSchema; } - public void setMeasurementSchema(MeasurementSchema measurementSchema) { + public void setMeasurementSchema(IMeasurementSchema measurementSchema) { this.measurementSchema = measurementSchema; } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java index 5f096bd..008bbe2 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java @@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.io.IOException; @@ -36,7 +37,7 @@ public class MeasurementMNode extends MNode { private static final long serialVersionUID = -1199657856921206435L; /** measurement's Schema for one timeseries represented by current leaf node */ - private MeasurementSchema schema; + private IMeasurementSchema schema; private String alias; // tag/attribute's start offset in tag file @@ -59,13 +60,13 @@ public class MeasurementMNode extends MNode { } public MeasurementMNode( - MNode parent, String measurementName, MeasurementSchema schema, String alias) { + MNode parent, String measurementName, IMeasurementSchema schema, String alias) { super(parent, measurementName); this.schema = schema; this.alias = alias; } - public MeasurementSchema getSchema() { + public IMeasurementSchema getSchema() { return schema; } @@ -119,7 +120,7 @@ public class MeasurementMNode extends MNode { this.alias = alias; } - public void setSchema(MeasurementSchema schema) { + public void setSchema(IMeasurementSchema schema) { this.schema = schema; } @@ -147,7 +148,7 @@ public class MeasurementMNode extends MNode { props.put(propInfo.split(":")[0], propInfo.split(":")[1]); } } - MeasurementSchema schema = + IMeasurementSchema schema = new MeasurementSchema( name, Byte.parseByte(nodeInfo[3]), diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 54a0bf7..41d665f 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -134,7 +134,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import java.io.File; @@ -955,7 +955,7 @@ public class PlanExecutor implements IPlanExecutor { String.format( "Cannot load file %s because the file has crashed.", file.getAbsolutePath())); } - Map<Path, MeasurementSchema> schemaMap = new HashMap<>(); + Map<Path, IMeasurementSchema> schemaMap = new HashMap<>(); List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>(); try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) { @@ -985,7 +985,7 @@ public class PlanExecutor implements IPlanExecutor { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private void createSchemaAutomatically( List<ChunkGroupMetadata> chunkGroupMetadataList, - Map<Path, MeasurementSchema> knownSchemas, + Map<Path, IMeasurementSchema> knownSchemas, int sgLevel) throws QueryProcessException, MetadataException { if (chunkGroupMetadataList.isEmpty()) { @@ -1005,7 +1005,7 @@ public class PlanExecutor implements IPlanExecutor { + chunkMetadata.getMeasurementUid()); if (!registeredSeries.contains(series)) { registeredSeries.add(series); - MeasurementSchema schema = + IMeasurementSchema schema = knownSchemas.get(new Path(series.getDevice(), series.getMeasurement())); if (schema == null) { throw new MetadataException( diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java index cee286a..a46f8f7 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.io.DataOutputStream; @@ -31,7 +32,7 @@ import java.util.List; import java.util.Objects; public class MeasurementMNodePlan extends MNodePlan { - private MeasurementSchema schema; + private IMeasurementSchema schema; private String alias; private long offset; @@ -40,7 +41,7 @@ public class MeasurementMNodePlan extends MNodePlan { } public MeasurementMNodePlan( - String name, String alias, long offset, int childSize, MeasurementSchema schema) { + String name, String alias, long offset, int childSize, IMeasurementSchema schema) { super(false, Operator.OperatorType.MEASUREMENT_MNODE); this.name = name; this.alias = alias; @@ -91,11 +92,11 @@ public class MeasurementMNodePlan extends MNodePlan { index = buffer.getLong(); } - public MeasurementSchema getSchema() { + public IMeasurementSchema getSchema() { return schema; } - public void setSchema(MeasurementSchema schema) { + public void setSchema(IMeasurementSchema schema) { this.schema = schema; } diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java index b7e0842..93867cf 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java @@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2; import org.apache.iotdb.tsfile.v2.read.reader.page.PageReaderV2; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -85,7 +86,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { * file metadata size. Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length() * bytes of the file for preparing reading real data. * - * @param file the data file + * @param resourceToBeUpgraded resource to be updated * @throws IOException If some I/O error occurs */ public TsFileOnlineUpgradeTool(TsFileResource resourceToBeUpgraded) throws IOException { @@ -103,7 +104,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { /** * upgrade a single TsFile * - * @param tsFileName old version tsFile's absolute path + * @param resourceToBeUpgraded resource to be updated * @param upgradedResources new version tsFiles' resources */ public static void upgradeOneTsfile( @@ -143,7 +144,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>(); List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>(); byte marker; - List<MeasurementSchema> measurementSchemaList = new ArrayList<>(); + List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); try { while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { @@ -153,7 +154,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { newChunkGroup = false; } ChunkHeader header = reader.readChunkHeader(); - MeasurementSchema measurementSchema = + IMeasurementSchema measurementSchema = new MeasurementSchema( header.getMeasurementID(), header.getDataType(), @@ -297,14 +298,14 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { */ private void rewrite( String deviceId, - List<MeasurementSchema> schemas, + List<IMeasurementSchema> schemas, List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup, List<List<Boolean>> needToDecodeInfoInChunkGroup) throws IOException, PageException { - Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>(); + Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>(); for (int i = 0; i < schemas.size(); i++) { - MeasurementSchema schema = schemas.get(i); + IMeasurementSchema schema = schemas.get(i); List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i); List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i); List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i); @@ -326,7 +327,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { } } - for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : + for (Entry<Long, Map<IMeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup.entrySet()) { long partitionId = entry.getKey(); TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId); @@ -375,15 +376,15 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { private void writePageInToFile( File oldTsFile, - MeasurementSchema schema, + IMeasurementSchema schema, PageHeader pageHeader, ByteBuffer pageData, - Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup, + Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup, boolean isOnlyOnePageChunk) throws PageException { long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime()); getOrDefaultTsFileIOWriter(oldTsFile, partitionId); - Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = + Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>()); ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema)); chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader, isOnlyOnePageChunk); @@ -393,9 +394,9 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { private void decodeAndWritePageInToFiles( File oldTsFile, - MeasurementSchema schema, + IMeasurementSchema schema, ByteBuffer pageData, - Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) + Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) throws IOException { valueDecoder.reset(); PageReaderV2 pageReader = @@ -406,7 +407,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable { Object value = batchData.currentValue(); long partitionId = StorageEngine.getTimePartition(time); - Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = + Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>()); ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema)); getOrDefaultTsFileIOWriter(oldTsFile, partitionId); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java index f040c6b..92e5f3a 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; @@ -114,7 +115,7 @@ public class SchemaUtils { TSDataType dataType = schema.getType(); TSEncoding encoding = schema.getEncodingType(); CompressionType compressionType = schema.getCompressor(); - MeasurementSchema measurementSchema = + IMeasurementSchema measurementSchema = new MeasurementSchema(path.getMeasurement(), dataType, encoding, compressionType); MeasurementMNode measurementMNode = diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java index 4a699e0..2a9d808 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java @@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.io.File; @@ -167,7 +168,7 @@ public class SeriesReaderTestUtil { List<String> deviceIds) throws IOException, WriteProcessException { TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile()); - Map<String, MeasurementSchema> template = new HashMap<>(); + Map<String, IMeasurementSchema> template = new HashMap<>(); for (MeasurementSchema measurementSchema : measurementSchemas) { template.put(measurementSchema.getMeasurementId(), measurementSchema); } diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java index 0dae987..f24f023 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java @@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.Schema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; @@ -113,7 +114,7 @@ public class SeqTsFileRecoverTest { } Schema schema = new Schema(); - Map<String, MeasurementSchema> template = new HashMap<>(); + Map<String, IMeasurementSchema> template = new HashMap<>(); for (int i = 0; i < 10; i++) { template.put( "sensor" + i, new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN)); diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index b873ade..cfeb6c0 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -42,7 +42,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1008,7 +1008,7 @@ public class Session { TSInsertTabletReq request = new TSInsertTabletReq(); request.setDeviceId(tablet.deviceId); - for (MeasurementSchema measurementSchema : tablet.getSchemas()) { + for (IMeasurementSchema measurementSchema : tablet.getSchemas()) { request.addToMeasurements(measurementSchema.getMeasurementId()); request.addToTypes(measurementSchema.getType().ordinal()); } @@ -1109,7 +1109,7 @@ public class Session { request.addToDeviceIds(tablet.deviceId); List<String> measurements = new ArrayList<>(); List<Integer> dataTypes = new ArrayList<>(); - for (MeasurementSchema measurementSchema : tablet.getSchemas()) { + for (IMeasurementSchema measurementSchema : tablet.getSchemas()) { measurements.add(measurementSchema.getMeasurementId()); dataTypes.add(measurementSchema.getType().ordinal()); } diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala index 1291796..503f24f 100644 --- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala +++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala @@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.read.expression.{IExpression, QueryExpression} import org.apache.iotdb.tsfile.read.filter.{TimeFilter, ValueFilter} import org.apache.iotdb.tsfile.write.record.TSRecord import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint -import org.apache.iotdb.tsfile.write.schema.{MeasurementSchema, Schema} +import org.apache.iotdb.tsfile.write.schema.{IMeasurementSchema, MeasurementSchema, Schema} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -527,7 +527,7 @@ object NarrowConverter extends Converter { * @param options encoding options * @return MeasurementSchema */ - def getSeriesSchema(field: StructField, options: Map[String, String]): MeasurementSchema = { + def getSeriesSchema(field: StructField, options: Map[String, String]): IMeasurementSchema = { val dataType = getTsDataType(field.dataType) val encodingStr = dataType match { case TSDataType.BOOLEAN => options.getOrElse(QueryConstant.BOOLEAN, TSEncoding.PLAIN.toString) diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala index 293bb65..e7a8e42 100755 --- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala +++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala @@ -35,10 +35,11 @@ import org.apache.iotdb.tsfile.read.filter.{TimeFilter, ValueFilter} import org.apache.iotdb.tsfile.utils.Binary import org.apache.iotdb.tsfile.write.record.TSRecord import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint -import org.apache.iotdb.tsfile.write.schema.{MeasurementSchema, Schema} +import org.apache.iotdb.tsfile.write.schema.{IMeasurementSchema, MeasurementSchema, Schema} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ + import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -421,7 +422,7 @@ object WideConverter extends Converter { * @param options encoding options * @return MeasurementSchema */ - def getSeriesSchema(field: StructField, options: Map[String, String]): MeasurementSchema = { + def getSeriesSchema(field: StructField, options: Map[String, String]): IMeasurementSchema = { val dataType = getTsDataType(field.dataType) val encodingStr = dataType match { case TSDataType.BOOLEAN => options.getOrElse(QueryConstant.BOOLEAN, TSEncoding.PLAIN.toString) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 256288f..af2829a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.BloomFilter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; @@ -944,7 +945,7 @@ public class TsFileSequenceReader implements AutoCloseable { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public long selfCheck( - Map<Path, MeasurementSchema> newSchema, + Map<Path, IMeasurementSchema> newSchema, List<ChunkGroupMetadata> chunkGroupMetadataList, boolean fastFinish) throws IOException { @@ -985,7 +986,7 @@ public class TsFileSequenceReader implements AutoCloseable { long truncatedSize = headerLength; byte marker; String lastDeviceId = null; - List<MeasurementSchema> measurementSchemaList = new ArrayList<>(); + List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); try { while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { @@ -997,7 +998,7 @@ public class TsFileSequenceReader implements AutoCloseable { // insertion is not tolerable ChunkHeader chunkHeader = this.readChunkHeader(marker); measurementID = chunkHeader.getMeasurementID(); - MeasurementSchema measurementSchema = + IMeasurementSchema measurementSchema = new MeasurementSchema( measurementID, chunkHeader.getDataType(), @@ -1076,7 +1077,7 @@ public class TsFileSequenceReader implements AutoCloseable { if (lastDeviceId != null) { // schema of last chunk group if (newSchema != null) { - for (MeasurementSchema tsSchema : measurementSchemaList) { + for (IMeasurementSchema tsSchema : measurementSchemaList) { newSchema.putIfAbsent( new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema); } @@ -1095,7 +1096,7 @@ public class TsFileSequenceReader implements AutoCloseable { if (lastDeviceId != null) { // schema of last chunk group if (newSchema != null) { - for (MeasurementSchema tsSchema : measurementSchemaList) { + for (IMeasurementSchema tsSchema : measurementSchemaList) { newSchema.putIfAbsent( new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema); } @@ -1118,7 +1119,7 @@ public class TsFileSequenceReader implements AutoCloseable { if (lastDeviceId != null) { // schema of last chunk group if (newSchema != null) { - for (MeasurementSchema tsSchema : measurementSchemaList) { + for (IMeasurementSchema tsSchema : measurementSchemaList) { newSchema.putIfAbsent(new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java index 8ce3f01..69bbab8 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java @@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.Schema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -150,7 +150,8 @@ public class TsFileWriter implements AutoCloseable { } } - public void registerDeviceTemplate(String templateName, Map<String, MeasurementSchema> template) { + public void registerDeviceTemplate( + String templateName, Map<String, IMeasurementSchema> template) { schema.registerDeviceTemplate(templateName, template); } @@ -158,7 +159,7 @@ public class TsFileWriter implements AutoCloseable { schema.registerDevice(deviceId, templateName); } - public void registerTimeseries(Path path, MeasurementSchema measurementSchema) + public void registerTimeseries(Path path, IMeasurementSchema measurementSchema) throws WriteProcessException { if (schema.containsTimeseries(path)) { throw new WriteProcessException("given timeseries has exists! " + path); @@ -190,7 +191,7 @@ public class TsFileWriter implements AutoCloseable { groupWriter.tryToAddSeriesWriter(schema.getSeriesSchema(path), pageSize); } else if (schema.getDeviceTemplates() != null && schema.getDeviceTemplates().size() == 1) { // use the default template without needing to register device - Map<String, MeasurementSchema> template = + Map<String, IMeasurementSchema> template = schema.getDeviceTemplates().entrySet().iterator().next().getValue(); if (template.containsKey(path.getMeasurement())) { groupWriter.tryToAddSeriesWriter(template.get(path.getMeasurement()), pageSize); @@ -220,14 +221,14 @@ public class TsFileWriter implements AutoCloseable { String deviceId = tablet.deviceId; // add all SeriesWriter of measurements in this Tablet to this ChunkGroupWriter - for (MeasurementSchema timeseries : tablet.getSchemas()) { + for (IMeasurementSchema timeseries : tablet.getSchemas()) { String measurementId = timeseries.getMeasurementId(); Path path = new Path(deviceId, measurementId); if (schema.containsTimeseries(path)) { groupWriter.tryToAddSeriesWriter(schema.getSeriesSchema(path), pageSize); } else if (schema.getDeviceTemplates() != null && schema.getDeviceTemplates().size() == 1) { // use the default template without needing to register device - Map<String, MeasurementSchema> template = + Map<String, IMeasurementSchema> template = schema.getDeviceTemplates().entrySet().iterator().next().getValue(); if (template.containsKey(path.getMeasurement())) { groupWriter.tryToAddSeriesWriter(template.get(path.getMeasurement()), pageSize); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java index 1dbc9f5..b0ef852 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java @@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -51,7 +52,7 @@ public class ChunkGroupWriterImpl implements IChunkGroupWriter { } @Override - public void tryToAddSeriesWriter(MeasurementSchema schema, int pageSizeThreshold) { + public void tryToAddSeriesWriter(IMeasurementSchema schema, int pageSizeThreshold) { if (!chunkWriters.containsKey(schema.getMeasurementId())) { IChunkWriter seriesWriter = new ChunkWriterImpl(schema); this.chunkWriters.put(schema.getMeasurementId(), seriesWriter); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java index 2c00f4a..c3e7bf8 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java @@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.iotdb.tsfile.write.page.PageWriter; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; @@ -45,7 +45,7 @@ public class ChunkWriterImpl implements IChunkWriter { private static final Logger logger = LoggerFactory.getLogger(ChunkWriterImpl.class); - private MeasurementSchema measurementSchema; + private IMeasurementSchema measurementSchema; private ICompressor compressor; @@ -92,7 +92,7 @@ public class ChunkWriterImpl implements IChunkWriter { private Statistics<?> firstPageStatistics; /** @param schema schema of this measurement */ - public ChunkWriterImpl(MeasurementSchema schema) { + public ChunkWriterImpl(IMeasurementSchema schema) { this.measurementSchema = schema; this.compressor = ICompressor.getCompressor(schema.getCompressor()); this.pageBuffer = new PublicBAOS(); @@ -115,7 +115,7 @@ public class ChunkWriterImpl implements IChunkWriter { checkSdtEncoding(); } - public ChunkWriterImpl(MeasurementSchema schema, boolean isMerging) { + public ChunkWriterImpl(IMeasurementSchema schema, boolean isMerging) { this(schema); this.isMerging = isMerging; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java index 0c73fbb..9dee2ad 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkGroupWriter.java @@ -21,7 +21,7 @@ package org.apache.iotdb.tsfile.write.chunk; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; @@ -77,7 +77,7 @@ public interface IChunkGroupWriter { * @param measurementSchema a measurement descriptor containing the message of the series * @param pageSize the specified page size */ - void tryToAddSeriesWriter(MeasurementSchema measurementSchema, int pageSize); + void tryToAddSeriesWriter(IMeasurementSchema measurementSchema, int pageSize); /** * get the serialized size of current chunkGroup header + all chunks. Notice, the value does not diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java index 8467d15..5fa9977 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java @@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +63,7 @@ public class PageWriter { this(null, null); } - public PageWriter(MeasurementSchema measurementSchema) { + public PageWriter(IMeasurementSchema measurementSchema) { this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder()); this.statistics = Statistics.getStatsByType(measurementSchema.getType()); this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor()); @@ -263,7 +263,7 @@ public class PageWriter { } /** reset this page */ - public void reset(MeasurementSchema measurementSchema) { + public void reset(IMeasurementSchema measurementSchema) { timeOut.reset(); valueOut.reset(); statistics = Statistics.getStatsByType(measurementSchema.getType()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java new file mode 100644 index 0000000..e0c3dd7 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write.schema; + +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +public interface IMeasurementSchema { + + String getMeasurementId(); + + CompressionType getCompressor(); + + TSEncoding getEncodingType(); + + TSDataType getType(); + + TSEncoding getTimeTSEncoding(); + + Encoder getTimeEncoder(); + + Encoder getValueEncoder(); + + Map<String, String> getProps(); + + List<String> getValueMeasurementIdList(); + + List<TSDataType> getValueTSDataTypeList(); + + List<TSEncoding> getValueTSEncodingList(); + + List<Encoder> getValueEncoderList(); + + int serializeTo(ByteBuffer buffer); + + int serializeTo(OutputStream outputStream) throws IOException; +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java index 34ee496..e035787 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java @@ -32,7 +32,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,7 +44,8 @@ import java.util.Objects; * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has * TSDataTypeConverter up to now. */ -public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable { +public class MeasurementSchema + implements IMeasurementSchema, Comparable<MeasurementSchema>, Serializable { public static final MeasurementSchema TIME_SCHEMA = new MeasurementSchema( @@ -170,6 +173,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali return measurementSchema; } + @Override public String getMeasurementId() { return measurementId; } @@ -178,23 +182,32 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali this.measurementId = measurementId; } + @Override public Map<String, String> getProps() { return props; } + @Override public TSEncoding getEncodingType() { return TSEncoding.deserialize(encoding); } + @Override public TSDataType getType() { return TSDataType.deserialize(type); } + @Override + public TSEncoding getTimeTSEncoding() { + return getEncodingType(); + } + public void setProps(Map<String, String> props) { this.props = props; } /** function for getting time encoder. */ + @Override public Encoder getTimeEncoder() { TSEncoding timeEncoding = TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); @@ -203,6 +216,26 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali return TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType); } + @Override + public List<String> getValueMeasurementIdList() { + return Collections.emptyList(); + } + + @Override + public List<TSDataType> getValueTSDataTypeList() { + return Collections.emptyList(); + } + + @Override + public List<TSEncoding> getValueTSEncodingList() { + return Collections.emptyList(); + } + + @Override + public List<Encoder> getValueEncoderList() { + return Collections.emptyList(); + } + /** * get Encoder of value from encodingConverter by measurementID and data type. * @@ -218,11 +251,13 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali return encodingConverter.getEncoder(TSDataType.deserialize(type)); } + @Override public CompressionType getCompressor() { return CompressionType.deserialize(compressor); } /** function for serializing data to output stream. */ + @Override public int serializeTo(OutputStream outputStream) throws IOException { int byteLen = 0; @@ -248,6 +283,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali } /** function for serializing data to byte buffer. */ + @Override public int serializeTo(ByteBuffer buffer) { int byteLen = 0; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java index 84768e7..a01ca7e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java @@ -36,35 +36,36 @@ public class Schema implements Serializable { * Path (device + measurement) -> measurementSchema By default, use the LinkedHashMap to store the * order of insertion */ - private Map<Path, MeasurementSchema> registeredTimeseries; + private Map<Path, IMeasurementSchema> registeredTimeseries; /** template name -> (measuremnet -> MeasurementSchema) */ - private Map<String, Map<String, MeasurementSchema>> deviceTemplates; + private Map<String, Map<String, IMeasurementSchema>> deviceTemplates; public Schema() { this.registeredTimeseries = new LinkedHashMap<>(); } - public Schema(Map<Path, MeasurementSchema> knownSchema) { + public Schema(Map<Path, IMeasurementSchema> knownSchema) { this.registeredTimeseries = knownSchema; } - public void registerTimeseries(Path path, MeasurementSchema descriptor) { + public void registerTimeseries(Path path, IMeasurementSchema descriptor) { this.registeredTimeseries.put(path, descriptor); } - public void registerDeviceTemplate(String templateName, Map<String, MeasurementSchema> template) { + public void registerDeviceTemplate( + String templateName, Map<String, IMeasurementSchema> template) { if (deviceTemplates == null) { deviceTemplates = new HashMap<>(); } this.deviceTemplates.put(templateName, template); } - public void extendTemplate(String templateName, MeasurementSchema descriptor) { + public void extendTemplate(String templateName, IMeasurementSchema descriptor) { if (deviceTemplates == null) { deviceTemplates = new HashMap<>(); } - Map<String, MeasurementSchema> template = + Map<String, IMeasurementSchema> template = this.deviceTemplates.getOrDefault(templateName, new HashMap<>()); template.put(descriptor.getMeasurementId(), descriptor); this.deviceTemplates.put(templateName, template); @@ -74,14 +75,14 @@ public class Schema implements Serializable { if (!deviceTemplates.containsKey(templateName)) { return; } - Map<String, MeasurementSchema> template = deviceTemplates.get(templateName); - for (Map.Entry<String, MeasurementSchema> entry : template.entrySet()) { + Map<String, IMeasurementSchema> template = deviceTemplates.get(templateName); + for (Map.Entry<String, IMeasurementSchema> entry : template.entrySet()) { Path path = new Path(deviceId, entry.getKey()); registerTimeseries(path, entry.getValue()); } } - public MeasurementSchema getSeriesSchema(Path path) { + public IMeasurementSchema getSeriesSchema(Path path) { return registeredTimeseries.get(path); } @@ -92,7 +93,7 @@ public class Schema implements Serializable { return registeredTimeseries.get(path).getType(); } - public Map<String, Map<String, MeasurementSchema>> getDeviceTemplates() { + public Map<String, Map<String, IMeasurementSchema>> getDeviceTemplates() { return deviceTemplates; } @@ -102,7 +103,7 @@ public class Schema implements Serializable { } // for test - public Map<Path, MeasurementSchema> getRegisteredTimeseriesMap() { + public Map<Path, IMeasurementSchema> getRegisteredTimeseriesMap() { return registeredTimeseries; } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index 26f8494..b3f74ac 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.TsFileCheckStatus; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { private static final Logger logger = LoggerFactory.getLogger("FileMonitor"); private long truncatedSize = -1; - private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>(); + private Map<Path, IMeasurementSchema> knownSchemas = new HashMap<>(); private int lastFlushedChunkGroupIndex = 0; @@ -147,7 +147,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { return truncatedSize; } - public Map<Path, MeasurementSchema> getKnownSchema() { + public Map<Path, IMeasurementSchema> getKnownSchema() { return knownSchemas; } @@ -228,7 +228,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { return append; } - public void addSchema(Path path, MeasurementSchema schema) { + public void addSchema(Path path, IMeasurementSchema schema) { knownSchemas.put(path, schema); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 6dfeec8..fd1f1d5 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -39,7 +39,7 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.BytesUtils; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,7 +171,7 @@ public class TsFileIOWriter { * @throws IOException if I/O error occurs */ public void startFlushChunk( - MeasurementSchema measurementSchema, + IMeasurementSchema measurementSchema, CompressionType compressionCodecName, TSDataType tsDataType, TSEncoding encodingType, diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java index ab5d463..ccdb6f9 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java @@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint; import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint; import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.Schema; import org.slf4j.Logger; @@ -68,7 +68,7 @@ public class RecordUtils { for (int i = 2; i < items.length - 1; i += 2) { // get measurementId and value measurementId = items[i].trim(); - MeasurementSchema measurementSchema = + IMeasurementSchema measurementSchema = schema.getSeriesSchema(new Path(deviceId, measurementId)); if (measurementSchema == null) { LOG.warn("measurementId:{},type not found, pass", measurementId); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/DefaultDeviceTemplateTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/DefaultDeviceTemplateTest.java index b8a54cb..68abcfc 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/DefaultDeviceTemplateTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/DefaultDeviceTemplateTest.java @@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.QueryExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; @@ -53,7 +54,7 @@ public class DefaultDeviceTemplateTest { schemaList.add(s1); schemaList.add(s2); - Map<String, MeasurementSchema> schema = new HashMap<>(); + Map<String, IMeasurementSchema> schema = new HashMap<>(); schema.put("s1", s1); schema.put("s2", s2); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/schema/converter/SchemaBuilderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/schema/converter/SchemaBuilderTest.java index d21a9a6..a8c0638 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/schema/converter/SchemaBuilderTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/schema/converter/SchemaBuilderTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.Schema; @@ -51,12 +52,12 @@ public class SchemaBuilderTest { new MeasurementSchema( "s5", TSDataType.INT32, TSEncoding.TS_2DIFF, CompressionType.UNCOMPRESSED, null)); - Collection<MeasurementSchema> timeseries = schema.getRegisteredTimeseriesMap().values(); + Collection<IMeasurementSchema> timeseries = schema.getRegisteredTimeseriesMap().values(); String[] tsDesStrings = { "[s4,DOUBLE,RLE,{max_point_number=3},SNAPPY]", "[s5,INT32,TS_2DIFF,,UNCOMPRESSED]" }; int i = 0; - for (MeasurementSchema desc : timeseries) { + for (IMeasurementSchema desc : timeseries) { assertEquals(tsDesStrings[i++], desc.toString()); } } @@ -67,7 +68,7 @@ public class SchemaBuilderTest { Map<String, String> props = new HashMap<>(); props.put(JsonFormatConstant.MAX_POINT_NUMBER, "3"); Schema schema = new Schema(); - Map<String, MeasurementSchema> template = new HashMap<>(); + Map<String, IMeasurementSchema> template = new HashMap<>(); template.put( "s4", new MeasurementSchema( @@ -79,12 +80,12 @@ public class SchemaBuilderTest { schema.registerDeviceTemplate("template1", template); schema.registerDevice("d1", "template1"); - Collection<MeasurementSchema> timeseries = schema.getRegisteredTimeseriesMap().values(); + Collection<IMeasurementSchema> timeseries = schema.getRegisteredTimeseriesMap().values(); String[] tsDesStrings = { "[s4,DOUBLE,RLE,{max_point_number=3},SNAPPY]", "[s5,INT32,TS_2DIFF,,UNCOMPRESSED]" }; int i = 0; - for (MeasurementSchema desc : timeseries) { + for (IMeasurementSchema desc : timeseries) { assertEquals(tsDesStrings[i++], desc.toString()); } } @@ -95,7 +96,7 @@ public class SchemaBuilderTest { Map<String, String> props = new HashMap<>(); props.put(JsonFormatConstant.MAX_POINT_NUMBER, "3"); Schema schema = new Schema(); - Map<String, MeasurementSchema> template = new HashMap<>(); + Map<String, IMeasurementSchema> template = new HashMap<>(); template.put( "s4", new MeasurementSchema( @@ -113,14 +114,14 @@ public class SchemaBuilderTest { schema.registerDevice("d1", "template1"); - Collection<MeasurementSchema> timeseries = schema.getRegisteredTimeseriesMap().values(); + Collection<IMeasurementSchema> timeseries = schema.getRegisteredTimeseriesMap().values(); String[] tsDesStrings = { "[s4,DOUBLE,RLE,{max_point_number=3},SNAPPY]", "[s5,INT32,TS_2DIFF,,UNCOMPRESSED]", "[s6,INT64,RLE,{max_point_number=3},SNAPPY]" }; int i = 0; - for (MeasurementSchema desc : timeseries) { + for (IMeasurementSchema desc : timeseries) { assertEquals(tsDesStrings[i++], desc.toString()); } }
