This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch new_vector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 240df9cfbda079bc4c6c85f57f00fc38a7317be3 Merge: 18fe326 c8513e6 Author: JackieTien97 <[email protected]> AuthorDate: Fri Nov 12 09:57:08 2021 +0800 fix conflicts .gitignore | 1 - .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 2 +- cli/pom.xml | 5 + .../main/java/org/apache/iotdb/tool/ExportCsv.java | 86 +- client-cpp/pom.xml | 2 +- cluster/pom.xml | 4 + .../resources/conf/iotdb-cluster.properties | 5 - cluster/src/assembly/resources/sbin/add-node.bat | 2 +- cluster/src/assembly/resources/sbin/add-node.sh | 2 +- .../src/assembly/resources/sbin/remove-node.bat | 4 +- cluster/src/assembly/resources/sbin/remove-node.sh | 2 +- cluster/src/assembly/resources/sbin/start-node.bat | 4 +- cluster/src/assembly/resources/sbin/start-node.sh | 2 +- cluster/src/assembly/resources/sbin/stop-node.bat | 2 +- cluster/src/assembly/resources/sbin/stop-node.sh | 2 +- .../java/org/apache/iotdb/cluster/ClientMain.java | 3 +- .../org/apache/iotdb/cluster/ClusterIoTDB.java | 689 ++++++++ .../apache/iotdb/cluster/ClusterIoTDBMBean.java | 22 +- .../java/org/apache/iotdb/cluster/ClusterMain.java | 331 ---- .../apache/iotdb/cluster/client/BaseFactory.java | 54 + .../iotdb/cluster/client/ClientCategory.java | 22 +- .../apache/iotdb/cluster/client/ClientManager.java | 223 +++ .../iotdb/cluster/client/ClientPoolFactory.java | 98 ++ .../iotdb/cluster/client/DataClientProvider.java | 95 -- .../SyncClientFactory.java => IClientManager.java} | 21 +- .../cluster/client/async/AsyncBaseFactory.java | 68 + .../cluster/client/async/AsyncClientFactory.java | 65 - .../cluster/client/async/AsyncClientPool.java | 216 --- .../cluster/client/async/AsyncDataClient.java | 199 ++- .../client/async/AsyncDataHeartbeatClient.java | 81 - .../cluster/client/async/AsyncMetaClient.java | 147 +- .../client/async/AsyncMetaHeartbeatClient.java | 81 - .../cluster/client/sync/SyncClientAdaptor.java | 58 +- .../iotdb/cluster/client/sync/SyncClientPool.java | 169 -- .../iotdb/cluster/client/sync/SyncDataClient.java | 114 +- .../client/sync/SyncDataHeartbeatClient.java | 79 - .../iotdb/cluster/client/sync/SyncMetaClient.java | 121 +- .../client/sync/SyncMetaHeartbeatClient.java | 78 - .../apache/iotdb/cluster/config/ClusterConfig.java | 9 - .../iotdb/cluster/config/ClusterConstant.java | 86 + .../iotdb/cluster/config/ClusterDescriptor.java | 12 +- .../iotdb/cluster/coordinator/Coordinator.java | 55 +- .../apache/iotdb/cluster/log/LogDispatcher.java | 18 +- .../iotdb/cluster/log/applier/DataLogApplier.java | 9 +- .../iotdb/cluster/log/catchup/LogCatchUpTask.java | 5 +- .../manage/FilePartitionedSnapshotLogManager.java | 2 + .../log/manage/PartitionedSnapshotLogManager.java | 5 +- .../iotdb/cluster/log/manage/RaftLogManager.java | 23 +- .../iotdb/cluster/log/snapshot/FileSnapshot.java | 12 +- .../cluster/log/snapshot/MetaSimpleSnapshot.java | 2 +- .../cluster/log/snapshot/PullSnapshotTask.java | 5 +- .../apache/iotdb/cluster/metadata/CMManager.java | 204 ++- .../apache/iotdb/cluster/metadata/MetaPuller.java | 77 +- .../iotdb/cluster/partition/PartitionGroup.java | 5 + .../cluster/query/ClusterPhysicalGenerator.java | 11 - .../iotdb/cluster/query/ClusterPlanExecutor.java | 297 ++-- .../cluster/query/aggregate/ClusterAggregator.java | 33 +- .../cluster/query/fill/ClusterLinearFill.java | 8 +- .../cluster/query/fill/ClusterPreviousFill.java | 65 +- .../query/groupby/RemoteGroupByExecutor.java | 81 +- .../query/last/ClusterLastQueryExecutor.java | 67 +- .../cluster/query/reader/ClusterReaderFactory.java | 59 +- .../iotdb/cluster/query/reader/DataSourceInfo.java | 78 +- .../reader/RemoteSeriesReaderByTimestamp.java | 15 +- .../query/reader/RemoteSimpleSeriesReader.java | 15 +- .../query/reader/mult/MultDataSourceInfo.java | 56 +- .../query/reader/mult/RemoteMultSeriesReader.java | 31 +- .../apache/iotdb/cluster/server/ClientServer.java | 311 ---- .../iotdb/cluster/server/ClusterRPCService.java | 62 +- .../cluster/server/ClusterRPCServiceMBean.java | 21 +- .../iotdb/cluster/server/ClusterTSServiceImpl.java | 172 ++ .../iotdb/cluster/server/DataClusterServer.java | 1109 ------------ .../iotdb/cluster/server/MetaClusterServer.java | 379 ----- .../cluster/server/PullSnapshotHintService.java | 18 +- .../apache/iotdb/cluster/server/RaftServer.java | 285 ---- .../org/apache/iotdb/cluster/server/Response.java | 3 + .../iotdb/cluster/server/StoppedMemberManager.java | 9 +- .../server/clusterinfo/ClusterInfoServer.java | 1 + .../handlers/caller/AppendNodeEntryHandler.java | 8 +- .../server/handlers/caller/HeartbeatHandler.java | 6 +- .../server/heartbeat/DataHeartbeatServer.java | 83 - .../cluster/server/heartbeat/HeartbeatServer.java | 212 --- .../cluster/server/heartbeat/HeartbeatThread.java | 29 +- .../server/heartbeat/MetaHeartbeatServer.java | 91 - .../server/heartbeat/MetaHeartbeatThread.java | 6 + .../cluster/server/member/DataGroupMember.java | 90 +- .../server/member/DataGroupMemberMBean.java | 11 +- .../cluster/server/member/MetaGroupMember.java | 493 +++--- .../server/member/MetaGroupMemberMBean.java | 21 +- .../iotdb/cluster/server/member/RaftMember.java | 268 +-- .../member/RaftMemberMBean.java} | 41 +- .../server/raft/AbstractDataRaftService.java | 54 + .../server/raft/AbstractMetaRaftService.java | 52 + .../cluster/server/raft/AbstractRaftService.java | 76 + .../server/raft/DataRaftHeartBeatService.java | 68 + .../server/raft/DataRaftHeartBeatServiceMBean.java | 12 +- .../iotdb/cluster/server/raft/DataRaftService.java | 65 + .../cluster/server/raft/DataRaftServiceMBean.java | 12 +- .../server/raft/MetaRaftHeartBeatService.java | 68 + .../server/raft/MetaRaftHeartBeatServiceMBean.java | 12 +- .../iotdb/cluster/server/raft/MetaRaftService.java | 65 + .../cluster/server/raft/MetaRaftServiceMBean.java | 12 +- .../cluster/server/raft/RaftServiceHandler.java | 38 +- .../cluster/server/service/DataGroupEngine.java | 510 ++++++ .../server/service/DataGroupEngineMBean.java | 17 +- .../server/service/DataGroupServiceImpls.java | 743 ++++++++ .../cluster/server/service/MetaAsyncService.java | 20 +- .../cluster/server/service/MetaSyncService.java | 33 +- .../apache/iotdb/cluster/utils/ClientUtils.java | 64 +- .../apache/iotdb/cluster/utils/ClusterUtils.java | 49 - .../apache/iotdb/cluster/utils/PartitionUtils.java | 4 + .../cluster/utils/nodetool/ClusterMonitor.java | 28 +- .../iotdb/cluster/client/BaseClientTest.java | 156 ++ .../iotdb/cluster/client/ClientManagerTest.java | 212 +++ .../cluster/client/ClientPoolFactoryTest.java | 262 +++ .../cluster/client/DataClientProviderTest.java | 242 --- .../iotdb/cluster/client/MockClientManager.java} | 33 +- .../cluster/client/async/AsyncClientPoolTest.java | 209 --- .../cluster/client/async/AsyncDataClientTest.java | 109 +- .../client/async/AsyncDataHeartbeatClientTest.java | 60 - .../cluster/client/async/AsyncMetaClientTest.java | 108 +- .../client/async/AsyncMetaHeartbeatClientTest.java | 61 - .../cluster/client/sync/SyncClientPoolTest.java | 167 -- .../cluster/client/sync/SyncDataClientTest.java | 172 +- .../client/sync/SyncDataHeartbeatClientTest.java | 66 - .../cluster/client/sync/SyncMetaClientTest.java | 170 +- .../client/sync/SyncMetaHeartbeatClientTest.java | 66 - .../cluster/common/TestAsyncClientFactory.java | 55 - .../iotdb/cluster/common/TestAsyncDataClient.java | 3 +- .../iotdb/cluster/common/TestAsyncMetaClient.java | 9 +- .../apache/iotdb/cluster/common/TestSnapshot.java | 6 +- .../cluster/common/TestSyncClientFactory.java | 88 - .../org/apache/iotdb/cluster/common/TestUtils.java | 11 +- .../cluster/integration/BaseSingleNodeTest.java | 16 +- .../iotdb/cluster/integration/SingleNodeTest.java | 7 +- .../apache/iotdb/cluster/log/LogParserTest.java | 6 +- .../cluster/log/applier/DataLogApplierTest.java | 170 +- .../cluster/log/applier/MetaLogApplierTest.java | 4 +- .../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 5 - .../cluster/log/catchup/LogCatchUpTaskTest.java | 7 +- .../log/catchup/SnapshotCatchUpTaskTest.java | 9 +- .../cluster/log/logtypes/SerializeLogTest.java | 4 +- .../cluster/log/snapshot/DataSnapshotTest.java | 10 +- .../log/snapshot/MetaSimpleSnapshotTest.java | 2 +- .../cluster/log/snapshot/PullSnapshotTaskTest.java | 5 - .../iotdb/cluster/partition/MManagerWhiteBox.java | 2 +- .../cluster/partition/SlotPartitionTableTest.java | 2 +- .../cluster/query/manage/QueryCoordinatorTest.java | 8 +- .../cluster/query/reader/DatasourceInfoTest.java | 55 +- .../reader/RemoteSeriesReaderByTimestampTest.java | 132 +- .../query/reader/RemoteSimpleSeriesReaderTest.java | 104 +- .../mult/AssignPathManagedMergeReaderTest.java | 115 +- .../reader/mult/RemoteMultSeriesReaderTest.java | 220 ++- .../server/clusterinfo/ClusterInfoServerTest.java | 5 +- .../clusterinfo/ClusterInfoServiceImplTest.java | 26 +- .../caller/AppendGroupEntryHandlerTest.java | 3 +- .../caller/AppendNodeEntryHandlerTest.java | 3 +- .../handlers/caller/ElectionHandlerTest.java | 3 +- .../handlers/caller/HeartbeatHandlerTest.java | 3 +- .../handlers/caller/LogCatchUpHandlerTest.java | 3 +- .../server/heartbeat/DataHeartbeatThreadTest.java | 5 - .../server/heartbeat/HeartbeatThreadTest.java | 23 +- .../server/heartbeat/MetaHeartbeatThreadTest.java | 5 - .../iotdb/cluster/server/member/BaseMember.java | 60 +- .../cluster/server/member/DataGroupMemberTest.java | 33 +- .../cluster/server/member/MetaGroupMemberTest.java | 152 +- .../cluster/utils/CreateTemplatePlanUtil.java | 7 +- .../resources/node1conf/iotdb-engine.properties | 12 +- .../resources/node2conf/iotdb-engine.properties | 12 +- .../resources/node3conf/iotdb-engine.properties | 12 +- compile-tools/thrift/pom.xml | 2 +- distribution/src/assembly/all.xml | 4 + distribution/src/assembly/server.xml | 4 + docs/UserGuide/API/Programming-Java-Native-API.md | 133 +- docs/UserGuide/Appendix/Status-Codes.md | 1 + .../UserGuide/API/Programming-Java-Native-API.md | 130 +- docs/zh/UserGuide/Appendix/Status-Codes.md | 2 + .../Communication-Service-Protocol/RestService.md | 268 +++ example/client-cpp-example/pom.xml | 2 +- .../iotdb/AlignedTimeseriesSessionExample.java | 9 +- .../org/apache/iotdb/SessionConcurrentExample.java | 4 +- .../main/java/org/apache/iotdb/SessionExample.java | 4 +- flink-iotdb-connector/pom.xml | 5 + grafana/pom.xml | 4 +- .../apache/iotdb/hadoop/fileSystem/HDFSInput.java | 5 + .../apache/iotdb/hadoop/tsfile/TSFHadoopTest.java | 49 +- .../apache/iotdb/hive/TSFHiveInputFormatTest.java | 27 +- .../apache/iotdb/hive/TSFHiveRecordReaderTest.java | 28 +- jdbc/pom.xml | 5 + .../org/apache/iotdb/jdbc/IoTDBConnection.java | 10 +- metrics/ReadMe.md | 151 ++ metrics/dropwizard-metrics/pom.xml | 54 + .../dropwizard/DropwizardMetricManager.java | 392 +++++ .../iotdb/metrics/dropwizard/MetricName.java | 134 ++ .../dropwizard/reporter/DropwizardJmxReporter.java | 75 + .../reporter/DropwizardPrometheusReporter.java | 89 + .../prometheus/DropwizardMetricsExporter.java | 197 +++ .../dropwizard/reporter/prometheus/MetricType.java | 24 +- .../reporter/prometheus/PrometheusReporter.java | 233 +++ .../reporter/prometheus/PrometheusSender.java | 63 + .../reporter/prometheus/PrometheusTextWriter.java | 79 + .../reporter/prometheus/PushGateway.java | 150 ++ .../dropwizard/reporter/prometheus/TextFormat.java | 15 +- .../metrics/dropwizard/type/DropwizardCounter.java | 31 +- .../metrics/dropwizard/type/DropwizardGauge.java | 62 + .../dropwizard/type/DropwizardHistogram.java | 52 + .../type/DropwizardHistogramSnapshot.java | 73 + .../metrics/dropwizard/type/DropwizardRate.java | 94 ++ .../metrics/dropwizard/type/DropwizardTimer.java | 49 + .../org.apache.iotdb.metrics.MetricManager | 18 + .../services/org.apache.iotdb.metrics.Reporter | 19 + .../dropwizard/DropwizardMetricManagerTest.java | 261 +++ .../metrics/dropwizard/DropwizardMetricTest.java | 181 ++ .../dropwizard/DropwizardMetricTestPlan.java | 62 + .../metrics/dropwizard/PrometheusRunTest.java | 42 + .../src/test/resources/iotdb-metric.yml | 37 +- metrics/interface/pom.xml | 73 + .../interface/src/main/assembly/metric.xml | 4 +- .../main/assembly/resources/conf/iotdb-metric.yml | 37 +- .../apache/iotdb/metrics/CompositeReporter.java | 81 + .../org/apache/iotdb/metrics/MetricManager.java | 168 ++ .../org/apache/iotdb/metrics/MetricService.java | 156 ++ .../java/org/apache/iotdb/metrics/Reporter.java | 24 +- .../apache/iotdb/metrics/config/MetricConfig.java | 106 ++ .../metrics/config/MetricConfigDescriptor.java | 88 + .../iotdb/metrics/config/MetricConstant.java | 17 +- .../iotdb/metrics/impl/DoNothingCounter.java | 25 +- .../apache/iotdb/metrics/impl/DoNothingGauge.java | 20 +- .../iotdb/metrics/impl/DoNothingHistogram.java | 31 +- .../metrics/impl/DoNothingHistogramSnapshot.java | 61 +- .../iotdb/metrics/impl/DoNothingMetricManager.java | 178 ++ .../apache/iotdb/metrics/impl/DoNothingRate.java | 45 +- .../apache/iotdb/metrics/impl/DoNothingTimer.java | 29 +- .../org/apache/iotdb/metrics/type/Counter.java | 19 +- .../java/org/apache/iotdb/metrics/type/Gauge.java | 16 +- .../org/apache/iotdb/metrics/type/Histogram.java | 22 +- .../iotdb/metrics/type/HistogramSnapshot.java | 40 +- .../org/apache/iotdb/metrics/type/IMetric.java | 14 +- .../java/org/apache/iotdb/metrics/type/Rate.java | 31 +- .../java/org/apache/iotdb/metrics/type/Timer.java | 53 + .../apache/iotdb/metrics/utils/MonitorType.java | 41 +- .../iotdb/metrics/utils/PredefinedMetric.java | 14 +- .../apache/iotdb/metrics/utils/ReporterType.java | 19 +- .../iotdb/metrics/config/MetricConfigTest.java | 57 + .../src/main/test/resources/iotdb-metric.yml | 38 +- metrics/micrometer-metrics/pom.xml | 57 + .../iotdb/metrics/micrometer/MeterIdUtils.java | 18 +- .../micrometer/MicrometerMetricManager.java | 447 +++++ .../micrometer/reporter/MicrometerJmxReporter.java | 82 + .../reporter/MicrometerPrometheusReporter.java | 102 ++ .../metrics/micrometer/type/MicrometerCounter.java | 31 +- .../metrics/micrometer/type/MicrometerGauge.java | 33 +- .../micrometer/type/MicrometerHistogram.java | 52 + .../type/MicrometerHistogramSnapshot.java | 90 + .../metrics/micrometer/type/MicrometerRate.java | 81 + .../metrics/micrometer/type/MicrometerTimer.java | 54 + .../org.apache.iotdb.metrics.MetricManager | 18 + .../services/org.apache.iotdb.metrics.Reporter | 19 + .../metrics/micrometer/MicrometerMetricTest.java | 181 ++ .../micrometer/MicrometerMetricTestPlan.java | 62 + .../src/test/resources/iotdb-metric.yml | 37 +- server/src/assembly/server.xml => metrics/pom.xml | 38 +- openapi/pom.xml | 124 ++ openapi/src/main/openapi3/iotdb-rest.yaml | 167 ++ pom.xml | 28 +- server/pom.xml | 7 + .../resources/conf/iotdb-engine.properties | 3 +- .../assembly/resources/conf/iotdb-rest.properties | 52 +- server/src/assembly/server.xml | 4 + .../IoTDBDaemonThreadFactory.java} | 21 +- .../db/concurrent/IoTDBThreadPoolFactory.java | 186 +- .../org/apache/iotdb/db/concurrent/ThreadName.java | 13 +- .../threadpool/IThreadPoolMBean.java} | 31 +- .../WrappedScheduledExecutorService.java | 193 +++ .../WrappedScheduledExecutorServiceMBean.java} | 12 +- .../WrappedSingleThreadExecutorService.java | 119 ++ .../WrappedSingleThreadExecutorServiceMBean.java} | 12 +- .../WrappedSingleThreadScheduledExecutor.java | 124 ++ ...WrappedSingleThreadScheduledExecutorMBean.java} | 12 +- .../threadpool/WrappedThreadPoolExecutor.java | 82 + .../WrappedThreadPoolExecutorMBean.java} | 12 +- .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 29 +- .../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 + .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +- .../iotdb/db/conf/rest/IoTDBRestServiceCheck.java | 65 + .../iotdb/db/conf/rest/IoTDBRestServiceConfig.java | 143 ++ .../db/conf/rest/IoTDBRestServiceDescriptor.java | 162 ++ .../apache/iotdb/db/cq/ContinuousQueryService.java | 4 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 14 +- .../db/engine/cache/TimeSeriesMetadataCache.java | 12 +- .../engine/compaction/CompactionTaskManager.java | 8 +- .../cross/inplace/InplaceCompactionTask.java | 7 - .../cross/inplace/manage/MergeManager.java | 4 +- .../cross/inplace/task/MergeFileTask.java | 8 - .../inner/utils/InnerSpaceCompactionUtils.java | 4 - .../apache/iotdb/db/engine/flush/FlushManager.java | 2 +- .../db/engine/modification/ModificationFile.java | 10 +- .../io/LocalTextModificationAccessor.java | 29 +- .../engine/modification/io/ModificationReader.java | 3 +- .../engine/modification/io/ModificationWriter.java | 4 +- .../modification/utils/TracedBufferedReader.java | 419 +++++ .../engine/storagegroup/StorageGroupProcessor.java | 10 +- .../virtualSg/VirtualStorageGroupManager.java | 8 +- ...tException.java => ConfigurationException.java} | 24 +- ...ion.java => StorageGroupNotReadyException.java} | 11 +- .../exception/query/PathNumOverLimitException.java | 6 +- .../org/apache/iotdb/db/metadata/MManager.java | 69 +- .../iotdb/db/metadata/logfile/MLogTxtWriter.java | 65 +- .../iotdb/db/metadata/logfile/MLogWriter.java | 10 + .../db/metadata/logfile/MetadataOperationType.java | 2 + .../org/apache/iotdb/db/metadata/mtree/MTree.java | 65 +- .../iotdb/db/metadata/template/Template.java | 532 +++++- .../db/metadata/template/TemplateManager.java | 63 +- .../template/TemplateQueryType.java} | 15 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 24 + .../org/apache/iotdb/db/qp/logical/Operator.java | 4 +- .../qp/logical/crud/GroupByFillQueryOperator.java | 13 - .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 17 +- .../db/qp/physical/crud/AppendTemplatePlan.java | 218 +++ .../db/qp/physical/crud/CreateTemplatePlan.java | 365 ++-- .../db/qp/physical/crud/PruneTemplatePlan.java | 107 ++ .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 22 +- .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 101 +- .../apache/iotdb/db/qp/utils/WildcardsRemover.java | 17 +- .../dataset/groupby/GroupByEngineDataSet.java | 159 +- .../query/dataset/groupby/GroupByFillDataSet.java | 220 --- .../GroupByFillWithoutValueFilterDataSet.java | 577 +++++++ .../groupby/GroupByWithValueFilterDataSet.java | 2 +- .../groupby/GroupByWithoutValueFilterDataSet.java | 4 +- .../iotdb/db/query/executor/QueryRouter.java | 59 +- .../apache/iotdb/db/query/executor/fill/IFill.java | 83 +- .../iotdb/db/query/executor/fill/LinearFill.java | 81 +- .../iotdb/db/query/executor/fill/PreviousFill.java | 40 +- .../iotdb/db/query/executor/fill/ValueFill.java | 4 +- .../iotdb/db/query/expression/ResultColumn.java | 13 +- .../query/expression/unary/TimeSeriesOperand.java | 2 +- .../iotdb/db/rescon/AbstractPoolManager.java | 11 + .../java/org/apache/iotdb/db/rest/RestService.java | 156 ++ .../iotdb/db/rest/filter/ApiOriginFilter.java | 45 + .../iotdb/db/rest/filter/AuthorizationFilter.java | 125 ++ .../iotdb/db/rest/filter/BasicSecurityContext.java | 56 + .../java/org/apache/iotdb/db/rest/filter/User.java | 38 + .../org/apache/iotdb/db/rest/filter/UserCache.java | 56 + .../db/rest/handler/AuthorizationHandler.java | 52 + .../iotdb/db/rest/handler/ExceptionHandler.java | 69 + .../handler/PhysicalPlanConstructionHandler.java | 156 ++ .../iotdb/db/rest/handler/QueryDataSetHandler.java | 95 ++ .../db/rest/handler/RequestValidationHandler.java | 39 + .../iotdb/db/rest/impl/PingApiServiceImpl.java | 37 + .../iotdb/db/rest/impl/RestApiServiceImpl.java | 144 ++ .../java/org/apache/iotdb/db/service/IoTDB.java | 17 +- .../apache/iotdb/db/service/MetricsService.java | 4 +- .../org/apache/iotdb/db/service/RPCService.java | 14 +- .../iotdb/db/service/RPCServiceThriftHandler.java | 2 +- .../org/apache/iotdb/db/service/ServiceType.java | 11 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 143 +- .../org/apache/iotdb/db/service/UpgradeSevice.java | 9 +- .../iotdb/db/service/thrift/ThriftService.java | 22 +- .../db/service/thrift/ThriftServiceThread.java | 213 ++- .../iotdb/db/sync/receiver/SyncServerManager.java | 6 + .../iotdb/db/sync/sender/transfer/SyncClient.java | 11 +- .../org/apache/iotdb/db/tools/mlog/MLogParser.java | 8 + .../org/apache/iotdb/db/utils/SerializeUtils.java | 77 +- .../writelog/manager/MultiFileLogNodeManager.java | 5 +- .../db/writelog/node/ExclusiveWriteLogNode.java | 1 + .../iotdb/db/engine/cache/ChunkCacheTest.java | 2 +- .../compaction/inner/InnerCompactionCacheTest.java | 148 -- .../io/LocalTextModificationAccessorTest.java | 49 + .../apache/iotdb/db/integration/IoTDBAliasIT.java | 49 +- .../iotdb/db/integration/IoTDBCheckConfigIT.java | 41 +- .../iotdb/db/integration/IoTDBGroupByFillIT.java | 1785 +++++++++++++++++--- .../db/integration/IoTDBGroupByFillMixPathsIT.java | 294 ++++ .../db/integration/IoTDBGroupByMonthFillIT.java | 274 +++ .../db/integration/IoTDBPathNumOverLimitIT.java | 72 + .../db/integration/auth/IoTDBAuthorizationIT.java | 27 + .../iotdb/db/metadata/MManagerBasicTest.java | 147 +- .../iotdb/db/qp/physical/InsertRowPlanTest.java | 4 +- .../iotdb/db/qp/physical/InsertTabletPlanTest.java | 10 +- .../iotdb/db/qp/physical/PhysicalPlanTest.java | 93 +- .../dataset/groupby/GroupByFillDataSetTest.java | 170 +- .../apache/iotdb/db/rest/IoTDBRestServiceIT.java | 280 +++ .../org/apache/iotdb/db/tools/MLogParserTest.java | 14 +- .../iotdb/db/tools/TsFileSketchToolTest.java | 15 +- .../apache/iotdb/db/utils/SerializeUtilsTest.java | 743 ++++++++ .../src/test/resources/iotdb-rest.properties | 52 +- .../org/apache/iotdb/rpc/RpcTransportFactory.java | 36 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 5 +- .../java/org/apache/iotdb/session/Session.java | 225 ++- .../apache/iotdb/session/SessionConnection.java | 72 +- .../org/apache/iotdb/session/pool/SessionPool.java | 296 ++++ .../iotdb/session/template/InternalNode.java | 56 + .../iotdb/session/template/MeasurementNode.java | 67 + .../apache/iotdb/session/template/Template.java | 126 ++ .../iotdb/session/template/TemplateNode.java | 48 +- .../iotdb/session/template/TemplateQueryType.java | 15 +- .../java/org/apache/iotdb/session/SessionTest.java | 156 +- .../apache/iotdb/session/template/TemplateUT.java | 93 + site/pom.xml | 12 +- site/src/main/.vuepress/config.js | 1 + thrift/rpc-changelist.md | 14 +- thrift/src/main/thrift/rpc.thrift | 40 +- .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 15 +- .../fileOutputFactory/LocalFSOutputFactory.java | 4 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 1 + .../apache/iotdb/tsfile/read/common/BatchData.java | 94 ++ .../tsfile/read/common/DescReadWriteBatchData.java | 89 + .../apache/iotdb/tsfile/read/common/RowRecord.java | 6 +- .../iotdb/tsfile/read/reader/LocalTsFileInput.java | 5 + .../iotdb/tsfile/read/reader/TsFileInput.java | 2 + .../apache/iotdb/tsfile/utils/FilePathUtils.java | 37 +- 410 files changed, 23377 insertions(+), 9169 deletions(-) diff --cc cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index ac273f4,644a869..856c452 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@@ -537,7 -583,7 +538,8 @@@ public class CMManager extends MManage } /** return storage groups paths for given deviceIds or timeseries. */ -- private List<PartialPath> getStorageGroups(List<PartialPath> paths) throws MetadataException { ++ private List<PartialPath> getStorageGroups(List<? extends PartialPath> paths) ++ throws MetadataException { Set<PartialPath> storageGroups = new HashSet<>(); for (PartialPath path : paths) { storageGroups.add( @@@ -1044,13 -1101,11 +1057,13 @@@ if (result != null) { // paths may be empty, implying that the group does not contain matched paths, so we do not // need to query other nodes in the group - List<PartialPath> partialPaths = new ArrayList<>(); + List<MeasurementPath> partialPaths = new ArrayList<>(); for (int i = 0; i < result.paths.size(); i++) { - PartialPath matchedPath = getAssembledPathFromRequest(result.paths.get(i)); + // todo check this transform + MeasurementPath matchedPath = + (MeasurementPath) getAssembledPathFromRequest(result.paths.get(i)); partialPaths.add(matchedPath); - if (withAlias) { + if (withAlias && matchedPath != null) { matchedPath.setMeasurementAlias(result.aliasList.get(i)); } } @@@ -1219,9 -1278,12 +1236,12 @@@ * original paths */ public Pair<List<PartialPath>, List<PartialPath>> getMatchedPaths( -- List<PartialPath> originalPaths) { ++ List<? extends PartialPath> originalPaths) { ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>(); ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>(); + // TODO it is not suitable for register and deregister an Object to JMX to such a frequent + // function call. + // BUT is it suitable to create a thread pool for each calling?? ExecutorService getAllPathsService = Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size()); for (PartialPath pathStr : originalPaths) { diff --cc cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java index 49abc08,80a6a94..67ea518 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java @@@ -34,7 -33,7 +34,6 @@@ import org.apache.iotdb.db.qp.physical. import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurationPlanType; import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; import org.apache.iotdb.db.service.IoTDB; --import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -44,7 -43,7 +43,6 @@@ import java.io.FileInputStream import java.io.IOException; import java.io.InputStream; import java.net.URL; --import java.util.ArrayList; import java.util.List; import java.util.Properties; @@@ -57,17 -56,17 +55,8 @@@ public class ClusterPhysicalGenerator e } @Override -- public List<TSDataType> getSeriesTypes(List<PartialPath> paths) throws MetadataException { -- List<TSDataType> dataTypes = new ArrayList<>(); -- for (PartialPath path : paths) { -- dataTypes.add(path == null ? null : IoTDB.metaManager.getSeriesType(path)); -- } -- return dataTypes; -- } -- -- @Override public List<PartialPath> groupVectorPaths(List<PartialPath> paths) throws MetadataException { - return getCMManager().groupVectorPaths(paths); + return MetaUtils.groupAlignedPaths(paths); } @Override diff --cc cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java index 255182f,1f038ef..1dae797 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java @@@ -30,10 -32,10 +32,10 @@@ import org.apache.iotdb.cluster.rpc.thr import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.utils.ClusterQueryUtils; + import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; diff --cc cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java index e45a6ae,e849f04..8e41ac1 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java @@@ -27,10 -29,8 +29,8 @@@ import org.apache.iotdb.cluster.query.R import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; - import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; - import org.apache.iotdb.cluster.server.member.MetaGroupMember; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.TimeFilter; diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 5032357,87e5aeb..ac419c2 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@@ -85,15 -74,16 +74,16 @@@ import org.apache.iotdb.cluster.utils.C import org.apache.iotdb.cluster.utils.PartitionUtils; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.cluster.utils.nodetool.function.Status; + import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; - import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; - import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; + import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.IoTDB; + import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals; diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java index 3905a68,82ecc3a..f631958 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java @@@ -32,16 -31,9 +31,9 @@@ import org.apache.iotdb.cluster.rpc.thr import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.service.IoTDB; - import org.apache.iotdb.db.utils.CommonUtils; - import org.apache.iotdb.rpc.RpcTransportFactory; - import org.apache.thrift.TProcessor; - import org.apache.thrift.protocol.TProtocolFactory; - import org.apache.thrift.server.TServer; - import org.apache.thrift.server.TThreadPoolServer; - import org.apache.thrift.transport.TServerTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --cc cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java index ebd4f5b,e4f3fc2..9a74118 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java @@@ -53,10 -56,11 +56,11 @@@ import org.apache.iotdb.db.engine.stora import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; + import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.executor.PlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; diff --cc cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java index d89f09d,4b42fce..8ea7439 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java @@@ -29,12 -32,16 +32,16 @@@ import org.apache.iotdb.cluster.query.R import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; + import org.apache.iotdb.cluster.rpc.thrift.RaftService; import org.apache.iotdb.cluster.server.member.MetaGroupMember; + import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.utils.SerializeUtils; + import org.apache.iotdb.rpc.RpcTransportFactory; + import org.apache.iotdb.rpc.TConfigurationConst; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java index 4deddd1,e234c3e..be2ef05 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java @@@ -19,14 -19,15 +19,15 @@@ package org.apache.iotdb.cluster.server.clusterinfo; - import org.apache.iotdb.cluster.ClusterMain; + import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry; import org.apache.iotdb.cluster.rpc.thrift.Node; - import org.apache.iotdb.cluster.server.MetaClusterServer; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMemberTest; + import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.thrift.TException; import org.junit.After; diff --cc server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java index aa41b0d,07b2d26..d2828fa --- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java @@@ -21,8 -21,9 +21,9 @@@ package org.apache.iotdb.db.engine.modi import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; + import org.apache.iotdb.db.engine.modification.utils.TracedBufferedReader; import org.apache.iotdb.db.exception.metadata.IllegalPathException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.slf4j.Logger; diff --cc server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java index 51d7531,63cea4d..d37aefc --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java @@@ -24,10 -24,11 +24,11 @@@ import org.apache.iotdb.db.engine.stora import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; + import org.apache.iotdb.db.exception.StorageGroupNotReadyException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.TsFileProcessorException; -import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Pair; diff --cc server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java index 85f2803,b920c75..c19a58e --- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java @@@ -23,9 -23,11 +23,11 @@@ import org.apache.iotdb.db.engine.fileS import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; + import org.apache.iotdb.db.qp.physical.crud.AppendTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan; + import org.apache.iotdb.db.qp.physical.crud.PruneTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.UnsetSchemaTemplatePlan; import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan; diff --cc server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java index f84e554,e3d4f3b..a9cbe9d --- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java @@@ -31,8 -42,17 +42,16 @@@ import java.io.ByteArrayOutputStream import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; + import java.util.ArrayDeque; + import java.util.ArrayList; + import java.util.Arrays; -import java.util.Collection; + import java.util.Collections; + import java.util.Deque; import java.util.HashMap; + import java.util.HashSet; + import java.util.List; import java.util.Map; + import java.util.Set; public class Template { private String name; @@@ -46,25 -68,60 +67,56 @@@ * * @param plan createTemplatePlan */ - public Template(CreateTemplatePlan plan) { + public Template(CreateTemplatePlan plan) throws IllegalPathException { + boolean isAlign; + schemaMap = new HashMap<>(); name = plan.getName(); + alignedPrefix = new HashSet<>(); + directNodes = new HashMap<>(); - // put measurement into a map for (int i = 0; i < plan.getMeasurements().size(); i++) { - IMeasurementSchema curSchema = - new UnaryMeasurementSchema( - plan.getSchemaNames().get(i), - plan.getDataTypes().get(i).get(0), - plan.getEncodings().get(i).get(0), - plan.getCompressors().get(i)); - - String path = plan.getSchemaNames().get(i); - if (schemaMap.containsKey(path)) { - throw new IllegalArgumentException( - "Duplicate measurement name in create template plan. Name is :" + path); + IMeasurementSchema curSchema; + int size = plan.getMeasurements().get(i).size(); + if (size > 1) { + isAlign = true; + } else { + // Patch for align designation ambiguity when creating from serialization + String[] thisMeasurement = + MetaUtils.splitPathToDetachedPath(plan.getMeasurements().get(i).get(0)); + String thisPrefix = + joinBySeparator(Arrays.copyOf(thisMeasurement, thisMeasurement.length - 1)); - if (plan.getAlignedPrefix() != null && plan.getAlignedPrefix().contains(thisPrefix)) { - isAlign = true; - } else { - isAlign = false; - } ++ isAlign = plan.getAlignedPrefix() != null && plan.getAlignedPrefix().contains(thisPrefix); } - schemaMap.put(path, curSchema); + // vector, aligned measurements + if (isAlign) { + IMeasurementSchema[] curSchemas; + String[] measurementsArray = new String[size]; + TSDataType[] typeArray = new TSDataType[size]; + TSEncoding[] encodingArray = new TSEncoding[size]; + CompressionType[] compressorArray = new CompressionType[size]; + + for (int j = 0; j < size; j++) { + measurementsArray[j] = plan.getMeasurements().get(i).get(j); + typeArray[j] = plan.getDataTypes().get(i).get(j); + encodingArray[j] = plan.getEncodings().get(i).get(j); + compressorArray[j] = plan.getCompressors().get(i).get(j); + } + + curSchemas = constructSchemas(measurementsArray, typeArray, encodingArray, compressorArray); + constructTemplateTree(measurementsArray, curSchemas); + + } + // normal measurement + else { + curSchema = + new UnaryMeasurementSchema( + plan.getMeasurements().get(i).get(0), + plan.getDataTypes().get(i).get(0), + plan.getEncodings().get(i).get(0), + plan.getCompressors().get(i).get(0)); + constructTemplateTree(plan.getMeasurements().get(i).get(0), curSchema); + } } } @@@ -80,18 -137,468 +132,458 @@@ return schemaMap; } -- public void setSchemaMap(Map<String, IMeasurementSchema> schemaMap) { -- this.schemaMap = schemaMap; -- } -- public boolean hasSchema(String measurementId) { return schemaMap.containsKey(measurementId); } + public IMeasurementSchema getSchema(String measurementId) { + return schemaMap.get(measurementId); + } + + public List<IMeasurementMNode> getMeasurementMNode() { + Set<IMeasurementSchema> deduplicateSchema = new HashSet<>(); + List<IMeasurementMNode> res = new ArrayList<>(); + + for (IMeasurementSchema measurementSchema : schemaMap.values()) { + if (deduplicateSchema.add(measurementSchema)) { + IMeasurementMNode measurementMNode = null; + if (measurementSchema instanceof UnaryMeasurementSchema) { + measurementMNode = + MeasurementMNode.getMeasurementMNode( + null, measurementSchema.getMeasurementId(), measurementSchema, null); + + } else if (measurementSchema instanceof VectorMeasurementSchema) { + measurementMNode = + MeasurementMNode.getMeasurementMNode( + null, + getMeasurementNodeName(measurementSchema.getMeasurementId()), + measurementSchema, + null); + } + + res.add(measurementMNode); + } + } + + return res; + } + + public String getMeasurementNodeName(String measurementName) { + return schemaMap.get(measurementName).getMeasurementId(); + } + + // region construct template tree + /** Construct aligned measurements, checks prefix equality, path duplication and conflict */ + private void constructTemplateTree(String[] alignedPaths, IMeasurementSchema[] schemas) + throws IllegalPathException { + // Only for aligned Paths, with common direct prefix + String[] pathNodes; + IEntityMNode commonPar; + String prefix = null; + List<String> measurementNames = new ArrayList<>(); + IMeasurementMNode leafNode; + + // deduplicate + Set<String> pathSet = new HashSet<>(Arrays.asList(alignedPaths)); + if (pathSet.size() != alignedPaths.length) { + throw new IllegalPathException("Duplication in paths."); + } + + Set<String> checkSet = new HashSet<>(); + for (String path : alignedPaths) { + // check aligned whether legal, and records measurements name + if (getPathNodeInTemplate(path) != null) { + throw new IllegalPathException("Path duplicated: " + prefix); + } + pathNodes = MetaUtils.splitPathToDetachedPath(path); + + if (pathNodes.length == 1) { + prefix = ""; + } else { + prefix = joinBySeparator(Arrays.copyOf(pathNodes, pathNodes.length - 1)); + } + + if (checkSet.size() == 0) { + checkSet.add(prefix); + } + if (!checkSet.contains(prefix)) { + throw new IllegalPathException( + "Aligned measurements get different paths, " + alignedPaths[0]); + } + + measurementNames.add(pathNodes[pathNodes.length - 1]); + } + + synchronized (this) { + // if not aligned now, it will be set to aligned - if (!alignedPrefix.contains(prefix)) { - alignedPrefix.add(prefix); - } ++ alignedPrefix.add(prefix); + for (int i = 0; i <= measurementNames.size() - 1; i++) { + // find the parent and add nodes to template + if (prefix.equals("")) { + leafNode = + MeasurementMNode.getMeasurementMNode(null, measurementNames.get(i), schemas[i], ""); + directNodes.put(leafNode.getName(), leafNode); + } else { + commonPar = (IEntityMNode) constructEntityPath(alignedPaths[0]); + leafNode = + MeasurementMNode.getMeasurementMNode( + commonPar, measurementNames.get(i), schemas[i], ""); + commonPar.addChild(leafNode); + } + schemaMap.put(getFullPathWithoutTemplateName(leafNode), schemas[i]); + measurementsCount++; + } + } + } + + /** Construct single measurement, only check path conflict and duplication */ + private IMeasurementMNode constructTemplateTree(String path, IMeasurementSchema schema) + throws IllegalPathException { + if (getPathNodeInTemplate(path) != null) { + throw new IllegalPathException("Path duplicated: " + path); + } + String[] pathNode = MetaUtils.splitPathToDetachedPath(path); + IMNode cur = constructEntityPath(path); + + synchronized (this) { + IMeasurementMNode leafNode = + MeasurementMNode.getMeasurementMNode( + (IEntityMNode) cur, pathNode[pathNode.length - 1], schema, ""); + if (cur == null) { + directNodes.put(leafNode.getName(), leafNode); + } else { + cur.addChild(leafNode); + } + schemaMap.put(getFullPathWithoutTemplateName(leafNode), schema); + measurementsCount++; + return leafNode; + } + } + + private IMeasurementSchema constructSchema( + String nodeName, TSDataType dataType, TSEncoding encoding, CompressionType compressor) { + return new UnaryMeasurementSchema(nodeName, dataType, encoding, compressor); + } + + private IMeasurementSchema[] constructSchemas( + String[] nodeNames, + TSDataType[] dataTypes, + TSEncoding[] encodings, + CompressionType[] compressors) { + UnaryMeasurementSchema[] schemas = new UnaryMeasurementSchema[nodeNames.length]; + for (int i = 0; i < nodeNames.length; i++) { + schemas[i] = + new UnaryMeasurementSchema(nodeNames[i], dataTypes[i], encodings[i], compressors[i]); + } + return schemas; + } + // endregion + + // region query of template + + public List<String> getAllAlignedPrefix() { + return Arrays.asList(alignedPrefix.toArray(new String[0])); + } + + public List<String> getAlignedMeasurements(String prefix) throws IllegalPathException { + if (!alignedPrefix.contains(prefix)) { + return null; + } + IMNode prefixNode = getPathNodeInTemplate(prefix); + if (prefixNode == null) { + throw new IllegalPathException(prefix, "there is no prefix IMNode."); + } + if (prefixNode.isMeasurement()) { + throw new IllegalPathException(prefix, "path is a measurement."); + } + List<String> subMeasurements = new ArrayList<>(); + for (IMNode child : prefixNode.getChildren().values()) { + if (child.isMeasurement()) { + subMeasurements.add(child.getName()); + } + } + return subMeasurements; + } + + public List<String> getAllMeasurementsPaths() { + return new ArrayList<>(schemaMap.keySet()); + } + + public List<String> getMeasurementsUnderPath(String path) { + if (path.equals("")) { + return getAllMeasurementsPaths(); + } + List<String> res = new ArrayList<>(); + try { + IMNode cur = getPathNodeInTemplate(path); + if (cur == null) { + throw new IllegalPathException(path, "Path not exists."); + } + if (cur.isMeasurement()) { + return Collections.singletonList(getFullPathWithoutTemplateName(cur)); + } + Deque<IMNode> stack = new ArrayDeque<>(); + stack.push(cur); + while (stack.size() != 0) { + cur = stack.pop(); + if (cur.isMeasurement()) { + res.add(getFullPathWithoutTemplateName(cur)); + } else { + for (IMNode child : cur.getChildren().values()) stack.push(child); + } + } + } catch (IllegalPathException e) { + e.printStackTrace(); + } + return res; + } + + public int getMeasurementsCount() { + return measurementsCount; + } + + public IMNode getPathNodeInTemplate(String path) throws IllegalPathException { + String[] pathNodes = MetaUtils.splitPathToDetachedPath(path); + if (pathNodes.length == 0) { + return null; + } + IMNode cur = directNodes.getOrDefault(pathNodes[0], null); + if (cur == null || cur.isMeasurement()) { + return cur; + } + for (int i = 1; i < pathNodes.length; i++) { + if (cur.hasChild(pathNodes[i])) { + cur = cur.getChild(pathNodes[i]); + } else { + return null; + } + } + return cur; + } + + public boolean isPathExistInTemplate(String path) throws IllegalPathException { + String[] pathNodes = MetaUtils.splitPathToDetachedPath(path); + if (!directNodes.containsKey(pathNodes[0])) { + return false; + } + IMNode cur = directNodes.get(pathNodes[0]); + for (int i = 1; i < pathNodes.length; i++) { + if (cur.hasChild(pathNodes[i])) { + cur = cur.getChild(pathNodes[i]); + } else { + return false; + } + } + return true; + } + + public boolean isDirectNodeInTemplate(String nodeName) { + return directNodes.containsKey(nodeName); + } + + public boolean isPathMeasurement(String path) throws IllegalPathException { + String[] pathNodes = MetaUtils.splitPathToDetachedPath(path); + if (!directNodes.containsKey(pathNodes[0])) { + throw new IllegalPathException(path, "Path does not exist."); + } + IMNode cur = directNodes.get(pathNodes[0]); + for (int i = 1; i < pathNodes.length; i++) { + if (cur.hasChild(pathNodes[i])) { + cur = cur.getChild(pathNodes[i]); + } else { + throw new IllegalPathException(path, "Path does not exist."); + } + } + return cur.isMeasurement(); + } + + public IMNode getDirectNode(String nodeName) { + return directNodes.getOrDefault(nodeName, null); + } + - public Collection<IMNode> getDirectNodes() { - return directNodes.values(); - } - + // endregion + + // region inner utils + + private String getFullPathWithoutTemplateName(IMNode node) { + if (node == null) { + return ""; + } + StringBuilder builder = new StringBuilder(node.getName()); + IMNode cur = node.getParent(); + while (cur != null) { + builder.insert(0, cur.getName() + TsFileConstant.PATH_SEPARATOR); + cur = cur.getParent(); + } + return builder.toString(); + } + + /** + * @param path complete path to measurement. + * @return null if need to add direct node, will never return a measurement. + */ + private IMNode constructEntityPath(String path) throws IllegalPathException { + String[] pathNodes = MetaUtils.splitPathToDetachedPath(path); + if (pathNodes.length == 1) { + return null; + } + + IMNode cur = directNodes.get(pathNodes[0]); + if (cur == null) { + cur = new EntityMNode(null, pathNodes[0]); + directNodes.put(pathNodes[0], cur); + } + + if (cur.isMeasurement()) { + throw new IllegalPathException(path, "there is measurement in path."); + } + + for (int i = 1; i <= pathNodes.length - 2; i++) { + if (!cur.hasChild(pathNodes[i])) { + cur.addChild(pathNodes[i], new EntityMNode(cur, pathNodes[i])); + } + cur = cur.getChild(pathNodes[i]); + + if (cur.isMeasurement()) { + throw new IllegalPathException(path, "there is measurement in path."); + } + } + return cur; + } + + private static String joinBySeparator(String[] pathNodes) { + if ((pathNodes == null) || (pathNodes.length == 0)) { + return ""; + } + StringBuilder builder = new StringBuilder(pathNodes[0]); + for (int i = 1; i <= pathNodes.length - 1; i++) { + builder.append(TsFileConstant.PATH_SEPARATOR); + builder.append(pathNodes[i]); + } + return builder.toString(); + } + // endregion + + // region append of template + + public void addAlignedMeasurements( + String[] measurements, + TSDataType[] dataTypes, + TSEncoding[] encodings, + CompressionType[] compressors) + throws IllegalPathException { + IMeasurementSchema[] schema; + String prefix; + String[] pathNode; + String[] leafNodes = new String[measurements.length]; + + // If prefix exists and not aligned, it will throw exception + // Prefix equality will be checked in constructTemplateTree + pathNode = MetaUtils.splitPathToDetachedPath(measurements[0]); + prefix = joinBySeparator(Arrays.copyOf(pathNode, pathNode.length - 1)); + if ((getPathNodeInTemplate(prefix) != null) && (!alignedPrefix.contains(prefix))) { + throw new IllegalPathException(prefix, "path already exists but not aligned"); + } + + for (int i = 0; i <= measurements.length - 1; i++) { + pathNode = MetaUtils.splitPathToDetachedPath(measurements[i]); + leafNodes[i] = pathNode[pathNode.length - 1]; + } + schema = constructSchemas(leafNodes, dataTypes, encodings, compressors); + constructTemplateTree(measurements, schema); + } + + public void addUnalignedMeasurements( + String[] measurements, + TSDataType[] dataTypes, + TSEncoding[] encodings, + CompressionType[] compressors) + throws IllegalPathException { + String prefix; + String[] pathNode; + + // deduplicate + Set<String> pathSet = new HashSet<>(Arrays.asList(measurements)); + if (pathSet.size() != measurements.length) { + throw new IllegalPathException("Duplication in paths."); + } + + for (int i = 0; i <= measurements.length - 1; i++) { + pathNode = MetaUtils.splitPathToDetachedPath(measurements[i]); + + // If prefix exists and aligned, it will throw exception + prefix = joinBySeparator(Arrays.copyOf(pathNode, pathNode.length - 1)); + if ((getPathNodeInTemplate(prefix) != null) && (alignedPrefix.contains(prefix))) { + throw new IllegalPathException(prefix, "path already exists and aligned"); + } + + IMeasurementSchema schema = + constructSchema( + pathNode[pathNode.length - 1], dataTypes[i], encodings[i], compressors[i]); + constructTemplateTree(measurements[i], schema); + } + } + + // endregion + + // region deduction of template + + public void deleteMeasurements(String path) throws IllegalPathException { + IMNode cur = getPathNodeInTemplate(path); + if (cur == null) { + throw new IllegalPathException(path, "Path does not exist"); + } + if (!cur.isMeasurement()) { + throw new IllegalPathException(path, "Path is not pointed to a measurement node."); + } + + IMNode par = cur.getParent(); + if (par == null) { + directNodes.remove(cur.getName()); + } else { + par.deleteChild(cur.getName()); + } + schemaMap.remove(getFullPathWithoutTemplateName(cur)); + measurementsCount--; + } + + public void deleteSeriesCascade(String path) throws IllegalPathException { + IMNode cur = getPathNodeInTemplate(path); + IMNode par; + + if (cur == null) { + throw new IllegalPathException(path, "Path not exists."); + } + par = cur.getParent(); + if (par == null) { + directNodes.remove(cur.getName()); + } else { + par.deleteChild(cur.getName()); + } + + // Remove all aligned prefix below the series path + Deque<IMNode> astack = new ArrayDeque<>(); + astack.push(cur); + while (astack.size() != 0) { + IMNode top = astack.pop(); + if (!top.isMeasurement()) { + String thisPrefix = getFullPathWithoutTemplateName(top); - if (alignedPrefix.contains(thisPrefix)) { - alignedPrefix.remove(thisPrefix); - } ++ alignedPrefix.remove(thisPrefix); + for (IMNode child : top.getChildren().values()) { + astack.push(child); + } + } else { + schemaMap.remove(getFullPathWithoutTemplateName(top)); + measurementsCount--; + } + } + } + + public void deleteAlignedPrefix(String path) { - if (alignedPrefix.contains(path)) { - alignedPrefix.remove(path); - } ++ alignedPrefix.remove(path); + } + // endregion + public ByteBuffer serialize() { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); diff --cc server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateManager.java index cbf93f8,010dad6..efcdf4b --- a/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateManager.java @@@ -21,10 -21,17 +21,17 @@@ package org.apache.iotdb.db.metadata.te import org.apache.iotdb.db.exception.metadata.DuplicatedTemplateException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException; -import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.mnode.IMNode; ++import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.metadata.utils.MetaFormatUtils; + import org.apache.iotdb.db.metadata.utils.MetaUtils; + import org.apache.iotdb.db.qp.physical.crud.AppendTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan; + import org.apache.iotdb.db.qp.physical.crud.PruneTemplatePlan; import org.apache.iotdb.db.utils.TestOnly; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import java.util.List; import java.util.Map; diff --cc server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index 4325fb4,c9cda05..b1bdbba --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@@ -20,9 -20,10 +20,10 @@@ package org.apache.iotdb.db.qp.physical import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; + import org.apache.iotdb.db.qp.physical.crud.AppendTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; @@@ -252,6 -254,11 +254,11 @@@ public abstract class PhysicalPlan } } + /** Used to check whether a user has the permission to execute the plan with these paths. */ - public List<PartialPath> getAuthPaths() { ++ public List<? extends PartialPath> getAuthPaths() { + return getPaths(); + } + public static class Factory { private Factory() { diff --cc server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AppendTemplatePlan.java index 0000000,a6ca8b4..4adb985 mode 000000,100644..100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AppendTemplatePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AppendTemplatePlan.java @@@ -1,0 -1,218 +1,218 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.iotdb.db.qp.physical.crud; + -import org.apache.iotdb.db.metadata.PartialPath; ++import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.qp.logical.Operator.OperatorType; + import org.apache.iotdb.db.qp.physical.PhysicalPlan; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + + import java.io.DataOutputStream; + import java.io.IOException; + import java.nio.ByteBuffer; + import java.util.Arrays; + import java.util.List; + + public class AppendTemplatePlan extends PhysicalPlan { + + String name; + boolean isAligned; + String[] measurements; + TSDataType[] dataTypes; + TSEncoding[] encodings; + CompressionType[] compressors; + + public AppendTemplatePlan() { + super(false, OperatorType.APPEND_TEMPLATE); + } + + public AppendTemplatePlan( + String name, + boolean isAligned, + List<String> measurements, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors) { + super(false, OperatorType.APPEND_TEMPLATE); + this.name = name; + this.isAligned = isAligned; + this.measurements = measurements.toArray(new String[0]); + this.dataTypes = dataTypes.toArray(new TSDataType[0]); + this.encodings = encodings.toArray(new TSEncoding[0]); + this.compressors = compressors.toArray(new CompressionType[0]); + } + + public AppendTemplatePlan( + String name, + boolean isAligned, + String[] measurements, + TSDataType[] dataTypes, + TSEncoding[] encodings, + CompressionType[] compressors) { + super(false, OperatorType.APPEND_TEMPLATE); + this.name = name; + this.isAligned = isAligned; + this.measurements = measurements; + this.dataTypes = dataTypes; + this.encodings = encodings; + this.compressors = compressors; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public boolean isAligned() { + return isAligned; + } + + public List<String> getMeasurements() { + return Arrays.asList(measurements); + } + + public List<TSDataType> getDataTypes() { + return Arrays.asList(dataTypes); + } + + public List<TSEncoding> getEncodings() { + return Arrays.asList(encodings); + } + + public List<CompressionType> getCompressors() { + return Arrays.asList(compressors); + } + + @Override + public void serialize(ByteBuffer buffer) { + buffer.put((byte) PhysicalPlanType.APPEND_TEMPLATE.ordinal()); + + ReadWriteIOUtils.write(name, buffer); + ReadWriteIOUtils.write(isAligned, buffer); + + // measurements + ReadWriteIOUtils.write(measurements.length, buffer); + for (String measurement : measurements) { + ReadWriteIOUtils.write(measurement, buffer); + } + + // datatypes + ReadWriteIOUtils.write(dataTypes.length, buffer); + for (TSDataType dataType : dataTypes) { + ReadWriteIOUtils.write(dataType.ordinal(), buffer); + } + + // encoding + ReadWriteIOUtils.write(encodings.length, buffer); + for (TSEncoding encoding : encodings) { + ReadWriteIOUtils.write(encoding.ordinal(), buffer); + } + + // compressor + ReadWriteIOUtils.write(compressors.length, buffer); + for (CompressionType type : compressors) { + ReadWriteIOUtils.write(type.ordinal(), buffer); + } + + buffer.putLong(index); + } + + @Override + @SuppressWarnings("Duplicates") + public void deserialize(ByteBuffer buffer) { + name = ReadWriteIOUtils.readString(buffer); + isAligned = ReadWriteIOUtils.readBool(buffer); + + // measurements + int size = ReadWriteIOUtils.readInt(buffer); + measurements = new String[size]; + for (int i = 0; i < size; i++) { + measurements[i] = ReadWriteIOUtils.readString(buffer); + } + + // datatypes + size = ReadWriteIOUtils.readInt(buffer); + dataTypes = new TSDataType[size]; + for (int i = 0; i < size; i++) { + dataTypes[i] = TSDataType.values()[ReadWriteIOUtils.readInt(buffer)]; + } + + // encodings + size = ReadWriteIOUtils.readInt(buffer); + encodings = new TSEncoding[size]; + for (int i = 0; i < size; i++) { + encodings[i] = TSEncoding.values()[ReadWriteIOUtils.readInt(buffer)]; + } + + // compressor + size = ReadWriteIOUtils.readInt(buffer); + compressors = new CompressionType[size]; + for (int i = 0; i < size; i++) { + compressors[i] = CompressionType.values()[ReadWriteIOUtils.readInt(buffer)]; + } + + this.index = buffer.getLong(); + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeByte((byte) PhysicalPlanType.APPEND_TEMPLATE.ordinal()); + + ReadWriteIOUtils.write(name, stream); + ReadWriteIOUtils.write(isAligned, stream); + + // measurements + ReadWriteIOUtils.write(measurements.length, stream); + for (String measurement : measurements) { + ReadWriteIOUtils.write(measurement, stream); + } + + // datatype + ReadWriteIOUtils.write(dataTypes.length, stream); + for (TSDataType dataType : dataTypes) { + ReadWriteIOUtils.write(dataType.ordinal(), stream); + } + + // encoding + ReadWriteIOUtils.write(encodings.length, stream); + for (TSEncoding encoding : encodings) { + ReadWriteIOUtils.write(encoding.ordinal(), stream); + } + + // compressor + ReadWriteIOUtils.write(compressors.length, stream); + for (CompressionType type : compressors) { + ReadWriteIOUtils.write(type.ordinal(), stream); + } + + stream.writeLong(index); + } + + @Override + public List<PartialPath> getPaths() { + return null; + } + } diff --cc server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java index fe42e11,028b39f..0fd43b3 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java @@@ -19,9 -19,11 +19,11 @@@ package org.apache.iotdb.db.qp.physical.crud; + import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; + import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; diff --cc server/src/main/java/org/apache/iotdb/db/qp/physical/crud/PruneTemplatePlan.java index 0000000,a40ad1a..c6d18af mode 000000,100644..100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/PruneTemplatePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/PruneTemplatePlan.java @@@ -1,0 -1,107 +1,107 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.iotdb.db.qp.physical.crud; + -import org.apache.iotdb.db.metadata.PartialPath; ++import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.qp.logical.Operator.OperatorType; + import org.apache.iotdb.db.qp.physical.PhysicalPlan; + import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + + import java.io.DataOutputStream; + import java.io.IOException; + import java.nio.ByteBuffer; + import java.util.Arrays; + import java.util.List; + + public class PruneTemplatePlan extends PhysicalPlan { + + String name; + String[] prunedMeasurements; + + public PruneTemplatePlan() { + super(false, OperatorType.PRUNE_TEMPLATE); + } + + public PruneTemplatePlan(String name, List<String> prunedMeasurements) { + super(false, OperatorType.PRUNE_TEMPLATE); + + this.name = name; + this.prunedMeasurements = prunedMeasurements.toArray(new String[0]); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public List<String> getPrunedMeasurements() { + return Arrays.asList(prunedMeasurements); + } + + @Override + public void serialize(ByteBuffer buffer) { + buffer.put((byte) PhysicalPlanType.PRUNE_TEMPLATE.ordinal()); + + ReadWriteIOUtils.write(name, buffer); + + // measurements + ReadWriteIOUtils.write(prunedMeasurements.length, buffer); + for (String measurement : prunedMeasurements) { + ReadWriteIOUtils.write(measurement, buffer); + } + + buffer.putLong(index); + } + + @Override + public void deserialize(ByteBuffer buffer) { + name = ReadWriteIOUtils.readString(buffer); + + int size = ReadWriteIOUtils.readInt(buffer); + prunedMeasurements = new String[size]; + for (int i = 0; i < size; i++) { + prunedMeasurements[i] = ReadWriteIOUtils.readString(buffer); + } + + this.index = buffer.getLong(); + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeByte((byte) PhysicalPlanType.PRUNE_TEMPLATE.ordinal()); + + ReadWriteIOUtils.write(name, stream); + + ReadWriteIOUtils.write(prunedMeasurements.length, stream); + for (String measurement : prunedMeasurements) { + ReadWriteIOUtils.write(measurement, stream); + } + + stream.writeLong(index); + } + + @Override + public List<PartialPath> getPaths() { + return null; + } + } diff --cc server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java index 0000000,2eab664..20e2233 mode 000000,100644..100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillWithoutValueFilterDataSet.java @@@ -1,0 -1,577 +1,577 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.db.query.dataset.groupby; + + import org.apache.iotdb.db.engine.StorageEngine; + import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; + import org.apache.iotdb.db.exception.StorageEngineException; + import org.apache.iotdb.db.exception.query.QueryProcessException; + import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException; -import org.apache.iotdb.db.metadata.PartialPath; ++import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan; + import org.apache.iotdb.db.query.aggregation.AggregateResult; + import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult; + import org.apache.iotdb.db.query.context.QueryContext; + import org.apache.iotdb.db.query.executor.fill.IFill; + import org.apache.iotdb.db.query.executor.fill.LinearFill; + import org.apache.iotdb.db.query.executor.fill.PreviousFill; + import org.apache.iotdb.db.query.executor.fill.ValueFill; + import org.apache.iotdb.db.query.factory.AggregateResultFactory; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.read.TimeValuePair; + import org.apache.iotdb.tsfile.read.common.Path; + import org.apache.iotdb.tsfile.read.common.RowRecord; + import org.apache.iotdb.tsfile.read.filter.GroupByFilter; + import org.apache.iotdb.tsfile.read.filter.basic.Filter; + import org.apache.iotdb.tsfile.utils.Pair; + import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.stream.Collectors; + + public class GroupByFillWithoutValueFilterDataSet extends GroupByWithoutValueFilterDataSet { + + private static final Logger logger = + LoggerFactory.getLogger(GroupByFillWithoutValueFilterDataSet.class); + + private Map<TSDataType, IFill> fillTypes; + private final List<PartialPath> deduplicatedPaths; + private final List<String> aggregations; + private Map<PartialPath, GroupByExecutor> extraPreviousExecutors = null; + private Map<PartialPath, GroupByExecutor> extraNextExecutors = null; + + // the extra previous means first not null value before startTime + // used to fill result before the first not null data + private Object[] extraPreviousValues; + private long[] extraPreviousTimes; + + // the previous value for each time series, which means + // first not null value GEQ curStartTime in order asc + // second not null value GEQ curStartTime in order desc + private Object[] previousValues; + private long[] previousTimes; + + // the extra next means first not null value after endTime + // used to fill result after the last not null data + private Object[] extraNextValues; + private long[] extraNextTimes; + + // the next value for each time series, which means + // first not null value LEQ curStartTime in order desc + // second not null value LEQ curStartTime in order asc + private Object[] nextValues; + private long[] nextTimes; + + // the result datatype for each time series + private TSDataType[] resultDataType; + + // the next query time range of each path + private long[] queryStartTimes; + private long[] queryEndTimes; + private boolean[] hasCachedQueryInterval; + + public GroupByFillWithoutValueFilterDataSet( + QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan) + throws QueryProcessException, StorageEngineException { + super(context, groupByTimeFillPlan); + this.aggregations = groupByTimeFillPlan.getDeduplicatedAggregations(); + + this.deduplicatedPaths = new ArrayList<>(); + for (Path path : paths) { + PartialPath partialPath = (PartialPath) path; + if (!deduplicatedPaths.contains(partialPath)) { + deduplicatedPaths.add(partialPath); + } + } + + initArrays(); + initExtraExecutors(context, groupByTimeFillPlan); + if (extraPreviousExecutors != null) { + initExtraArrays(extraPreviousValues, extraPreviousTimes, true, extraPreviousExecutors); + } + if (extraNextExecutors != null) { + initExtraArrays(extraNextValues, extraNextTimes, false, extraNextExecutors); + } + initCachedTimesAndValues(); + } + + private void initArrays() { + extraPreviousValues = new Object[aggregations.size()]; + extraPreviousTimes = new long[aggregations.size()]; + previousValues = new Object[aggregations.size()]; + previousTimes = new long[aggregations.size()]; + extraNextValues = new Object[aggregations.size()]; + extraNextTimes = new long[aggregations.size()]; + nextValues = new Object[aggregations.size()]; + nextTimes = new long[aggregations.size()]; + Arrays.fill(extraPreviousValues, null); + Arrays.fill(extraPreviousTimes, Long.MIN_VALUE); + Arrays.fill(previousValues, null); + Arrays.fill(previousTimes, Long.MIN_VALUE); + Arrays.fill(extraNextValues, null); + Arrays.fill(extraNextTimes, Long.MAX_VALUE); + Arrays.fill(nextValues, null); + Arrays.fill(nextTimes, Long.MAX_VALUE); + + queryStartTimes = new long[paths.size()]; + queryEndTimes = new long[paths.size()]; + hasCachedQueryInterval = new boolean[paths.size()]; + resultDataType = new TSDataType[aggregations.size()]; + Arrays.fill(queryStartTimes, curStartTime); + Arrays.fill(queryEndTimes, curEndTime); + Arrays.fill(hasCachedQueryInterval, true); + for (PartialPath deduplicatedPath : deduplicatedPaths) { + List<Integer> indexes = resultIndexes.get(deduplicatedPath); + for (int index : indexes) { + switch (aggregations.get(index)) { + case "avg": + case "sum": + resultDataType[index] = TSDataType.DOUBLE; + break; + case "count": + case "max_time": + case "min_time": + resultDataType[index] = TSDataType.INT64; + break; + case "first_value": + case "last_value": + case "max_value": + case "min_value": + resultDataType[index] = dataTypes.get(index); + break; + } + } + } + } + + private void getGroupByExecutors( + Map<PartialPath, GroupByExecutor> extraExecutors, + QueryContext context, + GroupByTimeFillPlan groupByTimeFillPlan, + Filter timeFilter, + boolean isAscending) + throws StorageEngineException, QueryProcessException { + List<StorageGroupProcessor> list = + StorageEngine.getInstance() + .mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList())); + try { + // init resultIndexes, group result indexes by path + for (int i = 0; i < paths.size(); i++) { + PartialPath path = (PartialPath) paths.get(i); + if (!extraExecutors.containsKey(path)) { + // init GroupByExecutor + extraExecutors.put( + path, + getGroupByExecutor( + path, + groupByTimeFillPlan.getAllMeasurementsInDevice(path.getDevice()), + dataTypes.get(i), + context, + timeFilter.copy(), + null, + isAscending)); + } + AggregateResult aggrResult = + AggregateResultFactory.getAggrResultByName( + groupByTimeFillPlan.getDeduplicatedAggregations().get(i), + dataTypes.get(i), + ascending); + extraExecutors.get(path).addAggregateResult(aggrResult); + } + } finally { + StorageEngine.getInstance().mergeUnLock(list); + } + } + + /* Init extra path executors to query data outside the original group by query */ + private void initExtraExecutors(QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan) + throws StorageEngineException, QueryProcessException { + long minQueryStartTime = Long.MAX_VALUE; + long maxQueryEndTime = Long.MIN_VALUE; + this.fillTypes = groupByTimeFillPlan.getFillType(); + for (Map.Entry<TSDataType, IFill> IFillEntry : fillTypes.entrySet()) { + IFill fill = IFillEntry.getValue(); + if (fill instanceof PreviousFill) { + fill.convertRange(startTime, endTime); + minQueryStartTime = Math.min(minQueryStartTime, fill.getQueryStartTime()); + } else if (fill instanceof LinearFill) { + fill.convertRange(startTime, endTime); + minQueryStartTime = Math.min(minQueryStartTime, fill.getQueryStartTime()); + maxQueryEndTime = Math.max(maxQueryEndTime, fill.getQueryEndTime()); + } + } + + if (minQueryStartTime < Long.MAX_VALUE) { + extraPreviousExecutors = new HashMap<>(); + + long queryRange = minQueryStartTime - startTime; + long extraStartTime, intervalNum; + if (isSlidingStepByMonth) { + intervalNum = (long) Math.ceil(queryRange / (double) (slidingStep * MS_TO_MONTH)); + extraStartTime = calcIntervalByMonth(startTime, intervalNum * slidingStep); + while (extraStartTime < minQueryStartTime) { + intervalNum += 1; + extraStartTime = calcIntervalByMonth(startTime, intervalNum * slidingStep); + } + } else { + intervalNum = (long) Math.ceil(queryRange / (double) slidingStep); + extraStartTime = slidingStep * intervalNum + startTime; + } + + Filter timeFilter = new GroupByFilter(interval, slidingStep, extraStartTime, startTime); + getGroupByExecutors(extraPreviousExecutors, context, groupByTimeFillPlan, timeFilter, false); + } + + if (maxQueryEndTime > Long.MIN_VALUE) { + extraNextExecutors = new HashMap<>(); + Pair<Long, Long> lastTimeRange = getLastTimeRange(); + lastTimeRange = getNextTimeRange(lastTimeRange.left, true, false); + Filter timeFilter = + new GroupByFilter(interval, slidingStep, lastTimeRange.left, maxQueryEndTime); + getGroupByExecutors(extraNextExecutors, context, groupByTimeFillPlan, timeFilter, true); + } + } + + /* check if specified path has next extra range */ + private boolean pathHasExtra(int pathId, boolean isExtraPrevious, long extraStartTime) { + List<Integer> Indexes = resultIndexes.get(deduplicatedPaths.get(pathId)); + for (int resultIndex : Indexes) { + if (isExtraPrevious && extraPreviousValues[resultIndex] != null) { + continue; + } else if (!isExtraPrevious && extraNextValues[resultIndex] != null) { + continue; + } + + IFill fill = fillTypes.get(resultDataType[resultIndex]); + if (fill == null) { + continue; + } + if (fill instanceof PreviousFill && isExtraPrevious) { + if (fill.getQueryStartTime() <= extraStartTime) { + return true; + } + } else if (fill instanceof LinearFill) { + if (isExtraPrevious) { + if (fill.getQueryStartTime() <= extraStartTime) { + return true; + } + } else { + if (extraStartTime < fill.getQueryEndTime()) { + return true; + } + } + } + } + + return false; + } + + private void initExtraArrays( + Object[] extraValues, + long[] extraTimes, + boolean isExtraPrevious, + Map<PartialPath, GroupByExecutor> extraExecutors) + throws QueryProcessException { + for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) { + GroupByExecutor executor = extraExecutors.get(deduplicatedPaths.get(pathId)); + List<Integer> Indexes = resultIndexes.get(deduplicatedPaths.get(pathId)); + + Pair<Long, Long> extraTimeRange; + if (isExtraPrevious) { + extraTimeRange = getFirstTimeRange(); + } else { + extraTimeRange = getLastTimeRange(); + } + + extraTimeRange = getNextTimeRange(extraTimeRange.left, !isExtraPrevious, false); + try { + while (pathHasExtra(pathId, isExtraPrevious, extraTimeRange.left)) { + List<AggregateResult> aggregations = + executor.calcResult(extraTimeRange.left, extraTimeRange.right); + if (!resultIsNull(aggregations)) { + // we check extra time range in single path together, + // thus the extra result will be cached together + for (int i = 0; i < aggregations.size(); i++) { + if (extraValues[Indexes.get(i)] == null) { + extraValues[Indexes.get(i)] = aggregations.get(i).getResult(); + extraTimes[Indexes.get(i)] = extraTimeRange.left; + } + } + } + + extraTimeRange = getNextTimeRange(extraTimeRange.left, !isExtraPrevious, false); + } + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + } + } + + private boolean pathHasNext(int pathId) { + // has cached + if (hasCachedQueryInterval[pathId]) { + return true; + } + + // find the next aggregation interval + Pair<Long, Long> nextTimeRange = getNextTimeRange(queryStartTimes[pathId], ascending, true); + if (nextTimeRange == null) { + return false; + } + queryStartTimes[pathId] = nextTimeRange.left; + queryEndTimes[pathId] = nextTimeRange.right; + + hasCachedQueryInterval[pathId] = true; + return true; + } + + /* If result is null or CountAggrResult is 0, then result is NULL */ + private boolean resultIsNull(List<AggregateResult> aggregateResults) { + AggregateResult result = aggregateResults.get(0); + if (result.getResult() == null) { + return true; + } else { + return result instanceof CountAggrResult && (long) result.getResult() == 0; + } + } + + private void pathGetNext(int pathId) throws IOException { + GroupByExecutor executor = pathExecutors.get(deduplicatedPaths.get(pathId)); + List<Integer> resultIndex = resultIndexes.get(deduplicatedPaths.get(pathId)); + + // Slide value and time + pathSlideNext(pathId); + + List<AggregateResult> aggregations; + try { + // get second not null aggregate results + aggregations = executor.calcResult(queryStartTimes[pathId], queryEndTimes[pathId]); + hasCachedQueryInterval[pathId] = false; + while (resultIsNull(aggregations) && pathHasNext(pathId)) { + aggregations = executor.calcResult(queryStartTimes[pathId], queryEndTimes[pathId]); + hasCachedQueryInterval[pathId] = false; + } + } catch (QueryProcessException e) { + logger.error("GroupByFillWithoutValueFilterDataSet execute has error: ", e); + throw new IOException(e.getMessage(), e); + } + + if (resultIsNull(aggregations)) { + pathSlide(pathId); + } else { + for (int i = 0; i < aggregations.size(); i++) { + int Index = resultIndex.get(i); + if (ascending) { + nextValues[Index] = aggregations.get(i).getResult(); + nextTimes[Index] = queryStartTimes[pathId]; + } else { + previousValues[Index] = aggregations.get(i).getResult(); + previousTimes[Index] = queryStartTimes[pathId]; + } + } + } + + hasCachedQueryInterval[pathId] = false; + } + + private void pathSlideNext(int pathId) { + List<Integer> resultIndex = resultIndexes.get(deduplicatedPaths.get(pathId)); + if (ascending) { + for (int resultId : resultIndex) { + previousValues[resultId] = nextValues[resultId]; + previousTimes[resultId] = nextTimes[resultId]; + nextValues[resultId] = null; + nextTimes[resultId] = Long.MAX_VALUE; + } + } else { + for (int resultId : resultIndex) { + nextValues[resultId] = previousValues[resultId]; + nextTimes[resultId] = previousTimes[resultId]; + previousValues[resultId] = null; + previousTimes[resultId] = Long.MIN_VALUE; + } + } + } + + private void pathSlideExtra(int pathId) { + List<Integer> resultIndex = resultIndexes.get(deduplicatedPaths.get(pathId)); + if (ascending) { + for (int Index : resultIndex) { + nextValues[Index] = extraNextValues[Index]; + nextTimes[Index] = extraNextTimes[Index]; + } + } else { + for (int Index : resultIndex) { + previousValues[Index] = extraPreviousValues[Index]; + previousTimes[Index] = extraPreviousTimes[Index]; + } + } + } + + private void pathSlide(int pathId) throws IOException { + if (pathHasNext(pathId)) { + pathGetNext(pathId); + } else { + pathSlideExtra(pathId); + } + } + + /* Cache the previous and next query data before group by fill query */ + private void initCachedTimesAndValues() throws QueryProcessException { + for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) { + try { + pathSlide(pathId); + pathSlide(pathId); + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + } + } + + private void fillRecord( + int resultId, RowRecord record, Pair<Long, Object> beforePair, Pair<Long, Object> afterPair) + throws IOException { + // Don't fill count aggregation + if (Objects.equals(aggregations.get(resultId), "count")) { + record.addField((long) 0, TSDataType.INT64); + return; + } + + IFill fill = fillTypes.get(resultDataType[resultId]); + if (fill == null) { + record.addField(null); + return; + } + + if (fill instanceof PreviousFill) { + if (beforePair.right != null + && (fill.getBeforeRange() == -1 + || fill.insideBeforeRange(beforePair.left, record.getTimestamp())) + && ((!((PreviousFill) fill).isUntilLast()) + || (afterPair.right != null && afterPair.left < endTime))) { + record.addField(beforePair.right, resultDataType[resultId]); + } else { + record.addField(null); + } + } else if (fill instanceof LinearFill) { + LinearFill linearFill = new LinearFill(); + if (beforePair.right != null + && afterPair.right != null + && (fill.getBeforeRange() == -1 + || fill.insideBeforeRange(beforePair.left, record.getTimestamp())) + && (fill.getAfterRange() == -1 + || fill.insideAfterRange(afterPair.left, record.getTimestamp()))) { + try { + TimeValuePair filledPair = + linearFill.averageWithTimeAndDataType( + new TimeValuePair( + beforePair.left, + TsPrimitiveType.getByType(resultDataType[resultId], beforePair.right)), + new TimeValuePair( + afterPair.left, + TsPrimitiveType.getByType(resultDataType[resultId], afterPair.right)), + curStartTime, + resultDataType[resultId]); + record.addField(filledPair.getValue().getValue(), resultDataType[resultId]); + } catch (UnSupportedFillTypeException e) { + record.addField(null); + throw new IOException(e); + } + } else { + record.addField(null); + } + } else if (fill instanceof ValueFill) { + try { + TimeValuePair filledPair = fill.getFillResult(); + record.addField(filledPair.getValue().getValue(), resultDataType[resultId]); + } catch (QueryProcessException | StorageEngineException e) { + throw new IOException(e); + } + } + } + + @Override + public RowRecord nextWithoutConstraint() throws IOException { + if (!hasCachedTimeInterval) { + throw new IOException( + "need to call hasNext() before calling next() " + + "in GroupByFillWithoutValueFilterDataSet."); + } + hasCachedTimeInterval = false; + RowRecord record = new RowRecord(curStartTime); + + boolean[] pathNeedSlide = new boolean[previousTimes.length]; + Arrays.fill(pathNeedSlide, false); + for (int resultId = 0; resultId < previousTimes.length; resultId++) { + if (previousTimes[resultId] == curStartTime) { + record.addField(previousValues[resultId], resultDataType[resultId]); + if (!ascending) { + pathNeedSlide[resultId] = true; + } + } else if (nextTimes[resultId] == curStartTime) { + record.addField(nextValues[resultId], resultDataType[resultId]); + if (ascending) { + pathNeedSlide[resultId] = true; + } + } else if (previousTimes[resultId] < curStartTime && curStartTime < nextTimes[resultId]) { + fillRecord( + resultId, + record, + new Pair<>(previousTimes[resultId], previousValues[resultId]), + new Pair<>(nextTimes[resultId], nextValues[resultId])); + } else if (curStartTime < previousTimes[resultId]) { + fillRecord( + resultId, + record, + new Pair<>(extraPreviousTimes[resultId], extraPreviousValues[resultId]), + new Pair<>(previousTimes[resultId], previousValues[resultId])); + } else if (nextTimes[resultId] < curStartTime) { + fillRecord( + resultId, + record, + new Pair<>(nextTimes[resultId], nextValues[resultId]), + new Pair<>(extraNextTimes[resultId], extraNextValues[resultId])); + } + } + + // Slide paths + // the aggregation results of one path are either all null or all not null, + // thus slide all results together + for (int pathId = 0; pathId < deduplicatedPaths.size(); pathId++) { + List<Integer> resultIndex = resultIndexes.get(deduplicatedPaths.get(pathId)); + if (pathNeedSlide[resultIndex.get(0)]) { + pathSlide(pathId); + } + } + + if (!leftCRightO) { + record.setTimestamp(curEndTime - 1); + } + return record; + } + } diff --cc server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java index f3ef305,47c7094..f79e178 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java @@@ -21,8 -21,10 +21,10 @@@ package org.apache.iotdb.db.query.execu import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.query.context.QueryContext; + import org.apache.iotdb.db.query.control.SessionManager; + import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; diff --cc server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java index b47bfca,5dc293f..4151bbc --- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java @@@ -23,7 -23,8 +23,8 @@@ import org.apache.iotdb.db.engine.query import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.qp.utils.DatetimeUtils; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.impl.FirstValueAggrResult; import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrResult; diff --cc server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java index 0516090,c8a1842..36c99f9 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java @@@ -21,7 -21,8 +21,8 @@@ package org.apache.iotdb.db.query.execu import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.qp.utils.DatetimeUtils; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; diff --cc server/src/main/java/org/apache/iotdb/db/rest/handler/PhysicalPlanConstructionHandler.java index 0000000,0f2ec43..ea1096b mode 000000,100644..100644 --- a/server/src/main/java/org/apache/iotdb/db/rest/handler/PhysicalPlanConstructionHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/rest/handler/PhysicalPlanConstructionHandler.java @@@ -1,0 -1,156 +1,156 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.iotdb.db.rest.handler; + + import org.apache.iotdb.db.exception.WriteProcessRejectException; + import org.apache.iotdb.db.exception.metadata.IllegalPathException; -import org.apache.iotdb.db.metadata.PartialPath; ++import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; + import org.apache.iotdb.db.rest.model.InsertTabletRequest; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.utils.Binary; + import org.apache.iotdb.tsfile.utils.BitMap; + + import java.nio.charset.StandardCharsets; + import java.util.List; + + public class PhysicalPlanConstructionHandler { + private PhysicalPlanConstructionHandler() {} + + public static InsertTabletPlan constructInsertTabletPlan(InsertTabletRequest insertTabletRequest) + throws IllegalPathException, WriteProcessRejectException { + InsertTabletPlan insertTabletPlan = + new InsertTabletPlan( + new PartialPath(insertTabletRequest.getDeviceId()), + insertTabletRequest.getMeasurements()); + List<List<Object>> rawData = insertTabletRequest.getValues(); + List<String> rawDataType = insertTabletRequest.getDataTypes(); + + int rowSize = insertTabletRequest.getTimestamps().size(); + int columnSize = rawDataType.size(); + + Object[] columns = new Object[columnSize]; + BitMap[] bitMaps = new BitMap[columnSize]; + TSDataType[] dataTypes = new TSDataType[columnSize]; + + for (int i = 0; i < columnSize; i++) { + dataTypes[i] = TSDataType.valueOf(rawDataType.get(i)); + } + + for (int columnIndex = 0; columnIndex < columnSize; columnIndex++) { + bitMaps[columnIndex] = new BitMap(rowSize); + switch (dataTypes[columnIndex]) { + case BOOLEAN: + boolean[] booleanValues = new boolean[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + } else { + booleanValues[rowIndex] = (Boolean) rawData.get(columnIndex).get(rowIndex); + } + } + columns[columnIndex] = booleanValues; + break; + case INT32: + int[] intValues = new int[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + Object object = rawData.get(columnIndex).get(rowIndex); + if (object == null) { + bitMaps[columnIndex].mark(rowIndex); + } else if (object instanceof Integer) { + intValues[rowIndex] = (int) object; + } else { + throw new WriteProcessRejectException( + "unsupported data type: " + object.getClass().toString()); + } + } + columns[columnIndex] = intValues; + break; + case INT64: + long[] longValues = new long[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + Object object = rawData.get(columnIndex).get(rowIndex); + if (object == null) { + bitMaps[columnIndex].mark(rowIndex); + } else if (object instanceof Integer) { + longValues[rowIndex] = (int) object; + } else if (object instanceof Long) { + longValues[rowIndex] = (long) object; + } else { + throw new WriteProcessRejectException( + "unsupported data type: " + object.getClass().toString()); + } + } + columns[columnIndex] = longValues; + break; + case FLOAT: + float[] floatValues = new float[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + } else { + floatValues[rowIndex] = + ((Double) rawData.get(columnIndex).get(rowIndex)).floatValue(); + } + } + columns[columnIndex] = floatValues; + break; + case DOUBLE: + double[] doubleValues = new double[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + } else { + doubleValues[rowIndex] = (double) rawData.get(columnIndex).get(rowIndex); + } + } + columns[columnIndex] = doubleValues; + break; + case TEXT: + Binary[] binaryValues = new Binary[rowSize]; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + if (rawData.get(columnIndex).get(rowIndex) == null) { + bitMaps[columnIndex].mark(rowIndex); + binaryValues[rowIndex] = new Binary("".getBytes(StandardCharsets.UTF_8)); + } else { + binaryValues[rowIndex] = + new Binary( + rawData + .get(columnIndex) + .get(rowIndex) + .toString() + .getBytes(StandardCharsets.UTF_8)); + } + } + columns[columnIndex] = binaryValues; + break; + default: + throw new IllegalArgumentException("Invalid input: " + rawDataType.get(columnIndex)); + } + } + + insertTabletPlan.setTimes( + insertTabletRequest.getTimestamps().stream().mapToLong(Long::longValue).toArray()); + insertTabletPlan.setColumns(columns); + insertTabletPlan.setBitMaps(bitMaps); + insertTabletPlan.setRowCount(insertTabletRequest.getTimestamps().size()); + insertTabletPlan.setDataTypes(dataTypes); + insertTabletPlan.setAligned(insertTabletRequest.getIsAligned()); + return insertTabletPlan; + } + } diff --cc server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 8de33d7,911be23..e153c54 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@@ -39,8 -37,8 +37,9 @@@ import org.apache.iotdb.db.exception.me import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.exception.runtime.SQLParserException; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.metadata.template.TemplateQueryType; import org.apache.iotdb.db.metrics.server.SqlArgument; import org.apache.iotdb.db.qp.Planner; import org.apache.iotdb.db.qp.constant.SQLConstant; diff --cc server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogParser.java index 43e5ceb,96a81ee..e3f7a33 --- a/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogParser.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogParser.java @@@ -18,11 -18,13 +18,13 @@@ */ package org.apache.iotdb.db.tools.mlog; -import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.logfile.MLogReader; import org.apache.iotdb.db.metadata.logfile.MLogTxtWriter; +import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; + import org.apache.iotdb.db.qp.physical.crud.AppendTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan; + import org.apache.iotdb.db.qp.physical.crud.PruneTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan; import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan; import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan; diff --cc server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java index 389c315,d6e72ad..5a7230a --- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java @@@ -25,8 -25,7 +25,9 @@@ import org.apache.iotdb.db.exception.me import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.metadata.utils.MetaUtils; import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@@ -890,21 -909,122 +895,113 @@@ public class MManagerBasicTest } } + @Test + public void testTemplateInnerTree() { + CreateTemplatePlan plan = getTreeTemplatePlan(); + Template template; + MManager manager = IoTDB.metaManager; + + try { + manager.createSchemaTemplate(plan); + template = manager.getTemplate("treeTemplate"); + assertEquals(4, template.getMeasurementsCount()); + assertEquals("d1", template.getPathNodeInTemplate("d1").getName()); - assertEquals(null, template.getPathNodeInTemplate("notExists")); ++ assertNull(template.getPathNodeInTemplate("notExists")); + assertEquals("[GPS]", template.getAllAlignedPrefix().toString()); + + String[] alignedMeasurements = {"to.be.prefix.s1", "to.be.prefix.s2"}; + TSDataType[] dataTypes = {TSDataType.INT32, TSDataType.INT32}; + TSEncoding[] encodings = {TSEncoding.RLE, TSEncoding.RLE}; + CompressionType[] compressionTypes = {CompressionType.SNAPPY, CompressionType.SNAPPY}; + template.addAlignedMeasurements(alignedMeasurements, dataTypes, encodings, compressionTypes); + + assertEquals("[GPS, to.be.prefix]", template.getAllAlignedPrefix().toString()); + assertEquals("[s1, s2]", template.getAlignedMeasurements("to.be.prefix").toString()); + + template.deleteAlignedPrefix("to.be.prefix"); + + assertEquals("[GPS]", template.getAllAlignedPrefix().toString()); + assertEquals(null, template.getDirectNode("prefix")); + assertEquals("to", template.getDirectNode("to").getName()); + + try { + template.deleteMeasurements("a.single"); + fail(); + } catch (IllegalPathException e) { + assertEquals("a.single is not a legal path, because Path does not exist", e.getMessage()); + } + assertEquals( + "[d1.s1, GPS.x, to.be.prefix.s2, GPS.y, to.be.prefix.s1, s2]", + template.getAllMeasurementsPaths().toString()); + + template.deleteSeriesCascade("to"); + + assertEquals("[d1.s1, GPS.x, GPS.y, s2]", template.getAllMeasurementsPaths().toString()); + + } catch (MetadataException e) { + e.printStackTrace(); + } + } + + private CreateTemplatePlan getTreeTemplatePlan() { + /** + * Construct a template like: create schema template treeTemplate ( (d1.s1 INT32 GORILLA + * SNAPPY), (s2 INT32 GORILLA SNAPPY), (GPS.x FLOAT RLE SNAPPY), (GPS.y FLOAT RLE SNAPPY), )with + * aligned (GPS) + * + * <p>Check aligned path whether with same prefix? Construct tree + */ + List<List<String>> measurementList = new ArrayList<>(); + measurementList.add(Collections.singletonList("d1.s1")); + measurementList.add(Collections.singletonList("s2")); + measurementList.add(Arrays.asList("GPS.x", "GPS.y")); + + List<List<TSDataType>> dataTypeList = new ArrayList<>(); + dataTypeList.add(Collections.singletonList(TSDataType.INT32)); + dataTypeList.add(Collections.singletonList(TSDataType.INT32)); + dataTypeList.add(Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT)); + + List<List<TSEncoding>> encodingList = new ArrayList<>(); + encodingList.add(Collections.singletonList(TSEncoding.GORILLA)); + encodingList.add(Collections.singletonList(TSEncoding.GORILLA)); + encodingList.add(Arrays.asList(TSEncoding.RLE, TSEncoding.RLE)); + + List<List<CompressionType>> compressionTypes = new ArrayList<>(); + compressionTypes.add(Collections.singletonList(CompressionType.SDT)); + compressionTypes.add(Collections.singletonList(CompressionType.SNAPPY)); + compressionTypes.add(Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY)); + + return new CreateTemplatePlan( + "treeTemplate", measurementList, dataTypeList, encodingList, compressionTypes); + } + private CreateTemplatePlan getCreateTemplatePlan() { List<List<String>> measurementList = new ArrayList<>(); measurementList.add(Collections.singletonList("s11")); + + List<String> measurements = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + measurements.add("vector.s" + i); + } + measurementList.add(measurements); + List<List<TSDataType>> dataTypeList = new ArrayList<>(); dataTypeList.add(Collections.singletonList(TSDataType.INT64)); - List<TSDataType> dataTypes = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - dataTypes.add(TSDataType.INT64); - } - dataTypeList.add(dataTypes); List<List<TSEncoding>> encodingList = new ArrayList<>(); encodingList.add(Collections.singletonList(TSEncoding.RLE)); - List<TSEncoding> encodings = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - encodings.add(TSEncoding.RLE); - } - encodingList.add(encodings); - List<CompressionType> compressionTypes = new ArrayList<>(); - compressionTypes.add(CompressionType.SNAPPY); + List<List<CompressionType>> compressionTypes = new ArrayList<>(); + List<CompressionType> compressorList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + compressorList.add(CompressionType.SNAPPY); + } + compressionTypes.add(Collections.singletonList(CompressionType.SNAPPY)); + compressionTypes.add(compressorList); List<String> schemaNames = new ArrayList<>(); - schemaNames.add("s11"); + schemaNames.add("s21"); + schemaNames.add("vector"); return new CreateTemplatePlan( "template1", schemaNames, measurementList, dataTypeList, encodingList, compressionTypes); @@@ -1185,76 -1340,31 +1286,76 @@@ manager.showTimeseries(showTimeSeriesPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); assertEquals(1, result.size()); assertEquals("root.laptop.d1.s0", result.get(0).getName()); + } catch (MetadataException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } - // show timeseries - // showTimeSeriesPlan = - // new ShowTimeSeriesPlan(new PartialPath("root"), false, null, null, 0, 0, false); - // result = manager.showTimeseries(showTimeSeriesPlan, - // EnvironmentUtils.TEST_QUERY_CONTEXT); - // assertEquals(4, result.size()); + @Test + public void testShowTimeseriesWithTemplate() { + List<List<String>> measurementList = new ArrayList<>(); + measurementList.add(Collections.singletonList("s0")); + measurementList.add(Collections.singletonList("s1")); - // show timeseries root.laptop.d1.vector - showTimeSeriesPlan = + List<List<TSDataType>> dataTypeList = new ArrayList<>(); + dataTypeList.add(Collections.singletonList(TSDataType.INT32)); + dataTypeList.add(Collections.singletonList(TSDataType.INT32)); + + List<List<TSEncoding>> encodingList = new ArrayList<>(); + encodingList.add(Collections.singletonList(TSEncoding.RLE)); + encodingList.add(Collections.singletonList(TSEncoding.RLE)); + - List<CompressionType> compressionTypes = new ArrayList<>(); ++ List<List<CompressionType>> compressionTypes = new ArrayList<>(); + for (int i = 0; i < 2; i++) { - compressionTypes.add(compressionType); ++ compressionTypes.add(Collections.singletonList(compressionType)); + } + + List<String> schemaNames = new ArrayList<>(); + schemaNames.add("s0"); + schemaNames.add("s1"); + + CreateTemplatePlan plan = + new CreateTemplatePlan( + "template1", + schemaNames, + measurementList, + dataTypeList, + encodingList, + compressionTypes); + MManager manager = IoTDB.metaManager; + try { + manager.createSchemaTemplate(plan); + + // set device template + SetSchemaTemplatePlan setSchemaTemplatePlan = + new SetSchemaTemplatePlan("template1", "root.laptop.d1"); + manager.setSchemaTemplate(setSchemaTemplatePlan); + manager.setUsingSchemaTemplate(manager.getDeviceNode(new PartialPath("root.laptop.d1"))); + + // show timeseries root.laptop.d1.s0 + ShowTimeSeriesPlan showTimeSeriesPlan = new ShowTimeSeriesPlan( - new PartialPath("root.laptop.d1.vector.*"), false, null, null, 0, 0, false); - result = manager.showTimeseries(showTimeSeriesPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); - assertEquals(3, result.size()); - for (int i = 0; i < result.size(); i++) { - assertEquals("root.laptop.d1.vector.s" + (i + 1), result.get(i).getName()); - } + new PartialPath("root.laptop.d1.s0"), false, null, null, 0, 0, false); + List<ShowTimeSeriesResult> result = + manager.showTimeseries(showTimeSeriesPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); + assertEquals(1, result.size()); + assertEquals("root.laptop.d1.s0", result.get(0).getName()); - // show timeseries root.laptop.d1.vector.s1 + // show timeseries root.laptop.d1.(s1,s2,s3) showTimeSeriesPlan = - new ShowTimeSeriesPlan( - new PartialPath("root.laptop.d1.vector.s1"), false, null, null, 0, 0, false); + new ShowTimeSeriesPlan(new PartialPath("root.**"), false, null, null, 0, 0, false); result = manager.showTimeseries(showTimeSeriesPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); - assertEquals(1, result.size()); - assertEquals("root.laptop.d1.vector.s1", result.get(0).getName()); + assertEquals(2, result.size()); + Set<String> set = new HashSet<>(); + set.add("root.laptop.d1.s0"); + set.add("root.laptop.d1.s1"); + + for (ShowTimeSeriesResult showTimeSeriesResult : result) { + set.remove(showTimeSeriesResult.getName()); + } + + assertTrue(set.isEmpty()); } catch (MetadataException e) { e.printStackTrace(); fail(e.getMessage()); diff --cc server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java index 7ce8e27,1d513bf..4c42ee5 --- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java @@@ -107,68 -125,8 +107,68 @@@ public class InsertRowPlanTest } @Test + public void testInsertRowPlanWithSchemaTemplate() + throws QueryProcessException, MetadataException, InterruptedException, + QueryFilterOptimizationException, StorageEngineException, IOException { + List<List<String>> measurementList = new ArrayList<>(); + for (int i = 1; i <= 6; i++) { + measurementList.add(Collections.singletonList("s" + i)); + } + + List<List<TSDataType>> dataTypesList = new ArrayList<>(); + dataTypesList.add(Collections.singletonList(TSDataType.DOUBLE)); + dataTypesList.add(Collections.singletonList(TSDataType.FLOAT)); + dataTypesList.add(Collections.singletonList(TSDataType.INT64)); + dataTypesList.add(Collections.singletonList(TSDataType.INT32)); + dataTypesList.add(Collections.singletonList(TSDataType.BOOLEAN)); + dataTypesList.add(Collections.singletonList(TSDataType.TEXT)); + + List<List<TSEncoding>> encodingList = new ArrayList<>(); + for (int i = 1; i <= 6; i++) { + encodingList.add(Collections.singletonList(TSEncoding.PLAIN)); + } + - List<CompressionType> compressionTypes = new ArrayList<>(); ++ List<List<CompressionType>> compressionTypes = new ArrayList<>(); + for (int i = 1; i <= 6; i++) { - compressionTypes.add(CompressionType.SNAPPY); ++ compressionTypes.add(Collections.singletonList(CompressionType.SNAPPY)); + } + + List<String> schemaNames = new ArrayList<>(); + for (int i = 1; i <= 6; i++) { + schemaNames.add("s" + i); + } + + CreateTemplatePlan plan = + new CreateTemplatePlan( + "template1", + schemaNames, + measurementList, + dataTypesList, + encodingList, + compressionTypes); + + IoTDB.metaManager.createSchemaTemplate(plan); + IoTDB.metaManager.setSchemaTemplate(new SetSchemaTemplatePlan("template1", "root.isp.d1")); + + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false); + + InsertRowPlan rowPlan = getInsertRowPlan(); + + PlanExecutor executor = new PlanExecutor(); + executor.insert(rowPlan); + + QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1"); + QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); + Assert.assertEquals(6, dataSet.getPaths().size()); + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + Assert.assertEquals(6, record.getFields().size()); + } + } + + @Test public void testInsertRowSerialization() throws IllegalPathException, QueryProcessException { - InsertRowPlan plan1 = getInsertVectorRowPlan(); + InsertRowPlan plan1 = getInsertAlignedRowPlan(); PlanExecutor executor = new PlanExecutor(); executor.insert(plan1); diff --cc server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java index 262d712,3b4de84..c9c3e47 --- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java @@@ -284,13 -266,24 +284,17 @@@ public class InsertTabletPlanTest dataTypesList.add(Collections.singletonList(TSDataType.TEXT)); List<List<TSEncoding>> encodingList = new ArrayList<>(); - List<TSEncoding> e1 = new ArrayList<>(); - e1.add(TSEncoding.PLAIN); - e1.add(TSEncoding.PLAIN); - e1.add(TSEncoding.PLAIN); - encodingList.add(e1); - List<TSEncoding> e2 = new ArrayList<>(); - e2.add(TSEncoding.PLAIN); - e2.add(TSEncoding.PLAIN); - encodingList.add(e2); - encodingList.add(Collections.singletonList(TSEncoding.PLAIN)); + for (int i = 1; i <= 6; i++) { + encodingList.add(Collections.singletonList(TSEncoding.PLAIN)); + } - List<CompressionType> compressionTypes = new ArrayList<>(); - for (int i = 0; i < 6; i++) { - compressionTypes.add(CompressionType.SNAPPY); + List<List<CompressionType>> compressionTypes = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + List<CompressionType> compressorList = new ArrayList<>(); + for (int j = 0; j < 3; j++) { + compressorList.add(CompressionType.SNAPPY); + } + compressionTypes.add(compressorList); } List<String> schemaNames = new ArrayList<>();
