This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch cluster- in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5bdf9187b5a6a74904b5d2f64ff1b564188b8ee8 Merge: 4f92cc5 b484f2e Author: xiangdong huang <[email protected]> AuthorDate: Tue Aug 3 19:17:27 2021 +0800 temporary commit for refactor thrift rpc README_ZH.md | 2 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 73 +- .../org/apache/iotdb/cluster/ClusterIoTDB.java | 159 ++-- .../iotdb/cluster/config/ClusterConstant.java | 52 + .../iotdb/cluster/coordinator/Coordinator.java | 8 +- .../iotdb/cluster/log/applier/BaseApplier.java | 17 +- .../log/manage/PartitionedSnapshotLogManager.java | 8 +- .../apache/iotdb/cluster/metadata/CMManager.java | 223 ++--- .../apache/iotdb/cluster/metadata/MetaPuller.java | 27 +- .../cluster/query/ClusterPhysicalGenerator.java | 18 +- .../iotdb/cluster/query/ClusterPlanExecutor.java | 44 +- .../apache/iotdb/cluster/query/ClusterPlanner.java | 17 +- .../iotdb/cluster/query/ClusterQueryRouter.java | 40 + .../cluster/query/ClusterUDTFQueryExecutor.java | 111 +++ .../iotdb/cluster/query/LocalQueryExecutor.java | 32 +- .../cluster/query/aggregate/ClusterAggregator.java | 9 +- .../cluster/query/fill/ClusterPreviousFill.java | 10 +- .../query/groupby/RemoteGroupByExecutor.java | 27 +- .../query/last/ClusterLastQueryExecutor.java | 26 +- .../cluster/query/manage/ClusterQueryManager.java | 8 +- .../cluster/query/reader/ClusterReaderFactory.java | 9 +- .../cluster/query/reader/ClusterTimeGenerator.java | 14 +- .../iotdb/cluster/query/reader/DataSourceInfo.java | 28 +- .../reader/RemoteSeriesReaderByTimestamp.java | 3 + .../query/reader/RemoteSimpleSeriesReader.java | 3 + .../query/reader/mult/MultDataSourceInfo.java | 8 +- .../query/reader/mult/RemoteMultSeriesReader.java | 10 +- .../iotdb/cluster/server/ClusterRPCService.java | 17 +- .../iotdb/cluster/server/ClusterTSServiceImpl.java | 112 +-- .../iotdb/cluster/server/MetaClusterServer.java | 369 ------- .../iotdb/cluster/server/MetaClusterServer2.java | 372 ++++++++ .../apache/iotdb/cluster/server/RaftServer.java | 263 ----- .../cluster/server/heartbeat/HeartbeatThread.java | 6 + .../server/heartbeat/MetaHeartbeatServer.java | 13 +- .../cluster/server/member/MetaGroupMember.java | 72 +- .../apache/iotdb/cluster/utils/PartitionUtils.java | 4 +- .../cluster/utils/nodetool/ClusterMonitor.java | 11 +- .../org/apache/iotdb/cluster/common/IoTDBTest.java | 7 +- .../org/apache/iotdb/cluster/common/TestUtils.java | 7 +- .../cluster/integration/BaseSingleNodeTest.java | 9 +- .../cluster/log/applier/DataLogApplierTest.java | 40 +- .../cluster/log/logtypes/SerializeLogTest.java | 4 +- .../cluster/log/snapshot/FileSnapshotTest.java | 2 +- .../iotdb/cluster/partition/MManagerWhiteBox.java | 2 +- .../query/ClusterAggregateExecutorTest.java | 4 +- .../query/ClusterDataQueryExecutorTest.java | 10 +- .../cluster/query/ClusterFillExecutorTest.java | 4 +- .../query/ClusterPhysicalGeneratorTest.java | 3 +- .../cluster/query/ClusterPlanExecutorTest.java | 8 +- .../cluster/query/ClusterQueryRouterTest.java | 44 +- .../query/ClusterUDTFQueryExecutorTest.java | 116 +++ .../iotdb/cluster/query/LoadConfigurationTest.java | 2 +- .../ClusterGroupByNoVFilterDataSetTest.java | 2 +- .../groupby/ClusterGroupByVFilterDataSetTest.java | 2 +- .../query/groupby/MergeGroupByExecutorTest.java | 4 +- .../query/groupby/RemoteGroupByExecutorTest.java | 4 +- .../query/manage/ClusterQueryManagerTest.java | 12 +- .../query/reader/ClusterReaderFactoryTest.java | 2 +- .../query/reader/ClusterTimeGeneratorTest.java | 4 +- .../clusterinfo/ClusterInfoServiceImplTest.java | 13 +- .../cluster/server/member/DataGroupMemberTest.java | 14 +- .../cluster/server/member/MetaGroupMemberTest.java | 12 +- docs/Development/ContributeGuide.md | 37 +- docs/SystemDesign/SchemaManager/SchemaManager.md | 39 +- docs/SystemDesign/TsFile/Format.md | 477 +++++----- docs/UserGuide/Advanced-Features/Select-Into.md | 235 +++++ .../Programming-MQTT.md | 79 +- .../UserGuide/Ecosystem Integration/Flink IoTDB.md | 3 +- .../DDL-Data-Definition-Language.md | 6 +- .../UserGuide/System-Tools/Load-External-Tsfile.md | 32 +- docs/zh/Community/Community-Powered By.md | 41 +- docs/zh/Community/Feedback.md | 18 +- docs/zh/Development/Committer.md | 38 +- docs/zh/Development/ContributeGuide.md | 59 +- docs/zh/Development/HowToCommit.md | 40 +- docs/zh/Development/VoteRelease.md | 7 +- docs/zh/Download/README.md | 73 +- docs/zh/SystemDesign/Architecture/Architecture.md | 1 - docs/zh/SystemDesign/Client/RPC.md | 3 - docs/zh/SystemDesign/Connector/Hive-TsFile.md | 11 +- docs/zh/SystemDesign/Connector/Spark-IOTDB.md | 39 +- docs/zh/SystemDesign/Connector/Spark-TsFile.md | 25 +- docs/zh/SystemDesign/DataQuery/AggregationQuery.md | 31 +- .../SystemDesign/DataQuery/AlignByDeviceQuery.md | 12 +- docs/zh/SystemDesign/DataQuery/FillFunction.md | 30 +- docs/zh/SystemDesign/DataQuery/GroupByFillQuery.md | 37 +- docs/zh/SystemDesign/DataQuery/GroupByQuery.md | 48 +- docs/zh/SystemDesign/DataQuery/LastQuery.md | 22 +- docs/zh/SystemDesign/DataQuery/OrderByTimeQuery.md | 75 +- .../zh/SystemDesign/DataQuery/QueryFundamentals.md | 58 +- docs/zh/SystemDesign/DataQuery/RawDataQuery.md | 62 +- docs/zh/SystemDesign/DataQuery/SeriesReader.md | 24 +- docs/zh/SystemDesign/QueryEngine/Planner.md | 1 - .../QueryEngine/ResultSetConstruction.md | 2 +- .../zh/SystemDesign/SchemaManager/SchemaManager.md | 190 ++-- docs/zh/SystemDesign/StorageEngine/Compaction.md | 40 +- .../SystemDesign/StorageEngine/DataManipulation.md | 41 +- .../zh/SystemDesign/StorageEngine/DataPartition.md | 24 +- docs/zh/SystemDesign/StorageEngine/FileLists.md | 39 +- docs/zh/SystemDesign/StorageEngine/FlushManager.md | 4 +- docs/zh/SystemDesign/StorageEngine/MergeManager.md | 22 +- docs/zh/SystemDesign/StorageEngine/Recover.md | 29 +- .../zh/SystemDesign/StorageEngine/StorageEngine.md | 2 +- docs/zh/SystemDesign/StorageEngine/WAL.md | 4 +- docs/zh/SystemDesign/Tools/Sync.md | 105 +- docs/zh/SystemDesign/TsFile/Format.md | 584 ++++++------ docs/zh/SystemDesign/TsFile/Read.md | 110 +-- docs/zh/SystemDesign/TsFile/TsFile.md | 1 - docs/zh/SystemDesign/TsFile/Write.md | 3 +- .../zh/UserGuide/API/Programming-Cpp-Native-API.md | 87 +- docs/zh/UserGuide/API/Programming-Go-Native-API.md | 18 +- docs/zh/UserGuide/API/Programming-JDBC.md | 14 +- .../UserGuide/API/Programming-Java-Native-API.md | 59 +- .../UserGuide/API/Programming-Python-Native-API.md | 70 +- docs/zh/UserGuide/API/Programming-TsFile-API.md | 137 ++- docs/zh/UserGuide/API/Time-zone.md | 1 - .../Administration-Management/Administration.md | 35 +- docs/zh/UserGuide/Advanced-Features/Alerting.md | 15 +- .../Advanced-Features/Continuous-Query.md | 17 +- docs/zh/UserGuide/Advanced-Features/Select-Into.md | 234 +++++ docs/zh/UserGuide/Advanced-Features/Triggers.md | 170 +--- .../Advanced-Features/UDF-User-Defined-Function.md | 225 ++--- docs/zh/UserGuide/Appendix/Config-Manual.md | 139 ++- docs/zh/UserGuide/Appendix/SQL-Reference.md | 160 ++-- docs/zh/UserGuide/Appendix/Status-Codes.md | 19 +- docs/zh/UserGuide/CLI/Command-Line-Interface.md | 123 ++- docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md | 12 +- docs/zh/UserGuide/Cluster/Cluster-Setup.md | 67 +- .../Collaboration-of-Edge-and-Cloud/Sync-Tool.md | 40 +- .../Programming-MQTT.md | 111 ++- .../Programming-Thrift.md | 30 +- docs/zh/UserGuide/Comparison/TSDB-Comparison.md | 201 ++-- docs/zh/UserGuide/Data-Concept/Compression.md | 12 +- .../Data-Concept/Data-Model-and-Terminology.md | 59 +- docs/zh/UserGuide/Data-Concept/Data-Type.md | 4 +- docs/zh/UserGuide/Data-Concept/Encoding.md | 24 +- docs/zh/UserGuide/Data-Concept/SDT.md | 30 +- docs/zh/UserGuide/Ecosystem Integration/DBeaver.md | 20 +- .../UserGuide/Ecosystem Integration/Flink IoTDB.md | 5 +- .../Ecosystem Integration/Flink TsFile.md | 17 +- docs/zh/UserGuide/Ecosystem Integration/Grafana.md | 72 +- .../UserGuide/Ecosystem Integration/Hive TsFile.md | 55 +- .../Ecosystem Integration/MapReduce TsFile.md | 39 +- .../UserGuide/Ecosystem Integration/Spark IoTDB.md | 17 +- .../Ecosystem Integration/Spark TsFile.md | 88 +- .../Ecosystem Integration/Writing Data on HDFS.md | 46 +- .../Ecosystem Integration/Zeppelin-IoTDB.md | 51 +- .../zh/UserGuide/FAQ/Frequently-asked-questions.md | 33 +- .../UserGuide/IoTDB-Introduction/Architecture.md | 14 +- docs/zh/UserGuide/IoTDB-Introduction/Features.md | 8 +- .../zh/UserGuide/IoTDB-Introduction/Publication.md | 2 +- docs/zh/UserGuide/IoTDB-Introduction/Scenario.md | 20 +- .../UserGuide/IoTDB-Introduction/What-is-IoTDB.md | 4 +- .../DDL-Data-Definition-Language.md | 72 +- .../DML-Data-Manipulation-Language.md | 364 ++++--- .../IoTDB-SQL-Language/Maintenance-Command.md | 10 +- docs/zh/UserGuide/QuickStart/Files.md | 17 +- docs/zh/UserGuide/QuickStart/QuickStart.md | 78 +- docs/zh/UserGuide/QuickStart/ServerFileList.md | 64 +- docs/zh/UserGuide/QuickStart/WayToGetIoTDB.md | 43 +- docs/zh/UserGuide/System-Tools/CSV-Tool.md | 24 +- docs/zh/UserGuide/System-Tools/JMX-Tool.md | 24 +- .../UserGuide/System-Tools/Load-External-Tsfile.md | 58 +- docs/zh/UserGuide/System-Tools/MLogParser-Tool.md | 7 +- .../System-Tools/Monitor-and-Log-Tools.md | 96 +- docs/zh/UserGuide/System-Tools/NodeTool.md | 117 ++- .../Query-History-Visualization-Tool.md | 6 +- docs/zh/UserGuide/System-Tools/Watermark-Tool.md | 34 +- docs/zh/UserGuide/UserGuideReadme.md | 9 +- .../org/apache/iotdb/flink/FlinkIoTDBSink.java | 26 +- .../RPC.md => example/mqtt-customize/README.md | 45 +- example/mqtt-customize/pom.xml | 42 + .../server/CustomizedJsonPayloadFormatter.java | 62 ++ .../org.apache.iotdb.db.mqtt.PayloadFormatter | 1 + example/pom.xml | 1 + .../iotdb/HybridTimeseriesSessionExample.java | 129 +++ .../org/apache/iotdb/SessionConcurrentExample.java | 199 ++++ .../main/java/org/apache/iotdb/SessionExample.java | 14 + .../iotdb/tsfile/TsFileWriteVectorWithTablet.java | 89 +- .../java/org/apache/iotdb/flink/IoTDBSink.java | 26 +- .../iotdb/flink/options/IoTDBSinkOptions.java | 11 - ...tITCase.java => RowTSRecordOutputFormatIT.java} | 2 +- ...rmatITCase.java => RowTsFileInputFormatIT.java} | 2 +- pom.xml | 2 + .../resources/conf/iotdb-engine.properties | 24 + .../org/apache/iotdb/db/auth/AuthorityChecker.java | 1 + .../db/concurrent/IoTDBThreadPoolFactory.java | 18 + .../org/apache/iotdb/db/concurrent/ThreadName.java | 5 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 50 + .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 43 +- .../apache/iotdb/db/cq/ContinuousQueryTask.java | 6 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 183 ++-- .../db/engine/cache/TimeSeriesMetadataCache.java | 71 +- .../level/LevelCompactionTsFileManagement.java | 2 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 15 +- .../apache/iotdb/db/engine/memtable/IMemTable.java | 2 + .../iotdb/db/engine/merge/task/MergeTask.java | 10 +- .../selectinto/InsertTabletPlanGenerator.java | 254 +++++ .../selectinto/InsertTabletPlansIterator.java | 139 +++ .../engine/storagegroup/StorageGroupProcessor.java | 196 ++-- .../db/engine/storagegroup/TsFileProcessor.java | 10 +- .../db/engine/storagegroup/TsFileResource.java | 10 +- .../storagegroup/timeindex/DeviceTimeIndex.java | 13 +- .../storagegroup/timeindex/FileTimeIndex.java | 5 + .../engine/storagegroup/timeindex/ITimeIndex.java | 8 + .../storagegroup/timeindex/TimeIndexLevel.java | 10 - .../virtualSg/VirtualStorageGroupManager.java | 110 ++- .../db/engine/trigger/executor/TriggerEngine.java | 16 +- .../engine/trigger/executor/TriggerExecutor.java | 8 +- .../service/TriggerRegistrationService.java | 10 +- .../exception/query/PathNumOverLimitException.java | 13 +- .../org/apache/iotdb/db/metadata/MManager.java | 1004 ++++++-------------- .../java/org/apache/iotdb/db/metadata/MTree.java | 657 ++++++++----- .../org/apache/iotdb/db/metadata/MetaUtils.java | 18 +- .../iotdb/db/metadata/MetadataOperationType.java | 4 + .../db/metadata/{ => logfile}/MLogTxtWriter.java | 71 +- .../iotdb/db/metadata/logfile/MLogWriter.java | 24 +- .../db/metadata/{ => logfile}/TagLogFile.java | 2 +- .../iotdb/db/metadata/mnode/EntityMNode.java | 117 +++ .../iotdb/db/metadata/mnode/IEntityMNode.java | 55 ++ .../org/apache/iotdb/db/metadata/mnode/IMNode.java | 82 ++ .../iotdb/db/metadata/mnode/IMeasurementMNode.java | 58 ++ .../mnode/IStorageGroupMNode.java} | 14 +- .../iotdb/db/metadata/mnode/InternalMNode.java | 271 ++++++ .../org/apache/iotdb/db/metadata/mnode/MNode.java | 339 +------ .../iotdb/db/metadata/mnode/MeasurementMNode.java | 207 ++-- ...roupMNode.java => StorageGroupEntityMNode.java} | 23 +- .../iotdb/db/metadata/mnode/StorageGroupMNode.java | 11 +- .../apache/iotdb/db/metadata/tag/TagManager.java | 556 +++++++++++ .../iotdb/db/metadata/template/Template.java | 11 +- .../db/metadata/template/TemplateManager.java | 141 +++ .../org/apache/iotdb/db/monitor/StatMonitor.java | 2 +- .../main/java/org/apache/iotdb/db/qp/Planner.java | 51 +- .../apache/iotdb/db/qp/constant/SQLConstant.java | 4 + .../apache/iotdb/db/qp/executor/PlanExecutor.java | 73 +- .../org/apache/iotdb/db/qp/logical/Operator.java | 6 +- .../iotdb/db/qp/logical/crud/QueryOperator.java | 185 ++-- .../db/qp/logical/crud/SelectIntoOperator.java | 110 +++ .../iotdb/db/qp/logical/sys/LoadFilesOperator.java | 25 +- .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 38 +- .../db/qp/physical/crud/AlignByDevicePlan.java | 57 +- .../iotdb/db/qp/physical/crud/InsertPlan.java | 12 +- .../iotdb/db/qp/physical/crud/InsertRowPlan.java | 3 +- .../iotdb/db/qp/physical/crud/MeasurementInfo.java | 75 ++ .../iotdb/db/qp/physical/crud/SelectIntoPlan.java | 113 +++ ...emplatePlan.java => SetSchemaTemplatePlan.java} | 14 +- .../iotdb/db/qp/physical/sys/OperateFilePlan.java | 23 +- ...tePlan.java => SetUsingSchemaTemplatePlan.java} | 16 +- .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 191 ++-- .../iotdb/db/qp/strategy/LogicalChecker.java | 5 + .../iotdb/db/qp/strategy/PhysicalGenerator.java | 24 +- .../qp/strategy/optimizer/ConcatPathOptimizer.java | 8 +- .../qp/strategy/optimizer/ILogicalOptimizer.java | 3 +- .../apache/iotdb/db/qp/utils/WildcardsRemover.java | 28 +- .../db/query/control/QueryResourceManager.java | 100 +- .../iotdb/db/query/control/QueryTimeManager.java | 14 + .../iotdb/db/query/control/SessionManager.java | 10 +- .../apache/iotdb/db/query/control/TracingInfo.java | 85 ++ .../iotdb/db/query/control/TracingManager.java | 141 ++- .../db/query/dataset/AlignByDeviceDataSet.java | 26 +- .../iotdb/db/query/executor/LastQueryExecutor.java | 6 +- .../iotdb/db/query/reader/series/SeriesReader.java | 99 +- .../reader/universal/DescPriorityMergeReader.java | 5 +- .../reader/universal/PriorityMergeReader.java | 13 +- .../apache/iotdb/db/rescon/MemTableManager.java | 4 + .../apache/iotdb/db/rescon/TVListAllocator.java | 12 +- .../java/org/apache/iotdb/db/service/IoTDB.java | 4 +- .../org/apache/iotdb/db/service/RPCService.java | 1 + .../org/apache/iotdb/db/service/ServiceType.java | 7 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 313 +++--- .../org/apache/iotdb/db/service/UpgradeSevice.java | 9 +- .../iotdb/db/service/thrift/ThriftService.java | 18 +- .../db/service/thrift/ThriftServiceThread.java | 220 ++++- .../iotdb/db/sync/receiver/SyncServerManager.java | 7 +- .../apache/iotdb/db/tools/TsFileSketchTool.java | 583 ++++++++---- .../org/apache/iotdb/db/tools/mlog/MLogParser.java | 19 +- .../org/apache/iotdb/db/utils/SchemaUtils.java | 21 +- .../iotdb/db/writelog/recover/LogReplayer.java | 6 +- .../writelog/recover/TsFileRecoverPerformer.java | 26 +- .../db/engine/memtable/MemTableFlushTaskTest.java | 22 +- .../db/engine/memtable/MemTableTestUtils.java | 5 +- .../db/engine/memtable/PrimitiveMemTableTest.java | 3 +- .../storagegroup/StorageGroupProcessorTest.java | 84 +- .../iotdb/db/engine/storagegroup/TTLTest.java | 9 +- .../engine/storagegroup/TsFileProcessorTest.java | 14 +- .../iotdb/db/integration/IoTDBAddSubDeviceIT.java | 11 +- .../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 3 +- .../org/apache/iotdb/db/integration/IoTDBAsIT.java | 2 +- .../db/integration/IoTDBAutoCreateSchemaIT.java | 6 +- .../db/integration/IoTDBContinuousQueryIT.java | 147 ++- .../db/integration/IoTDBCreateStorageGroupIT.java | 128 +++ .../db/integration/IoTDBCreateTimeseriesIT.java | 34 +- .../db/integration/IoTDBInsertWithoutTimeIT.java | 129 +++ .../apache/iotdb/db/integration/IoTDBLastIT.java | 22 +- .../db/integration/IoTDBLoadExternalTsfileIT.java | 121 ++- .../iotdb/db/integration/IoTDBMetadataFetchIT.java | 20 +- .../db/integration/IoTDBQueryMemoryControlIT.java | 20 +- .../iotdb/db/integration/IoTDBRestartIT.java | 48 + .../iotdb/db/integration/IoTDBSelectIntoIT.java | 617 ++++++++++++ .../db/integration/IoTDBSequenceDataQueryIT.java | 12 +- .../iotdb/db/integration/IoTDBSeriesReaderIT.java | 11 +- .../iotdb/db/integration/IoTDBSimpleQueryIT.java | 14 +- .../db/integration/IoTDBTriggerExecutionIT.java | 43 +- .../iotdb/db/metadata/MManagerAdvancedTest.java | 20 +- .../iotdb/db/metadata/MManagerBasicTest.java | 170 ++-- .../iotdb/db/metadata/MManagerImproveTest.java | 8 +- .../org/apache/iotdb/db/metadata/MTreeTest.java | 103 +- .../apache/iotdb/db/metadata/MetaUtilsTest.java | 14 +- .../apache/iotdb/db/metadata/mnode/MNodeTest.java | 41 +- .../iotdb/db/qp/logical/LogicalPlanSmallTest.java | 2 +- .../iotdb/db/qp/physical/InsertRowPlanTest.java | 8 +- .../iotdb/db/qp/physical/InsertTabletPlanTest.java | 14 +- .../iotdb/db/qp/physical/PhysicalPlanTest.java | 83 +- .../iotdb/db/query/control/TracingManagerTest.java | 7 +- .../query/reader/series/SeriesReaderTestUtil.java | 2 +- .../org/apache/iotdb/db/tools/MLogParserTest.java | 71 +- .../iotdb/db/tools/TsFileSketchToolTest.java | 4 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 11 +- .../iotdb/db/utils/TsFileRewriteToolTest.java | 4 +- .../iotdb/db/writelog/recover/LogReplayerTest.java | 3 +- .../db/writelog/recover/SeqTsFileRecoverTest.java | 2 +- .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 6 +- .../apache/iotdb/session/IoTDBSessionVectorIT.java | 213 +++++ .../java/org/apache/iotdb/session/SessionTest.java | 2 +- site/README.md | 2 +- site/src/main/.vuepress/config.js | 31 +- site/src/main/.vuepress/theme/components/Page.vue | 15 +- .../apache/iotdb/spark/db/EnvironmentUtils.java | 111 ++- .../test/java/org/apache/iotdb/db/sql/Cases.java | 235 ++--- .../file/metadata/MetadataIndexConstructor.java | 52 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 17 +- .../org/apache/iotdb/tsfile/read/common/Path.java | 4 + .../java/org/apache/iotdb/tsfile/utils/BitMap.java | 5 + .../apache/iotdb/tsfile/write/TsFileWriter.java | 12 +- .../tsfile/write/chunk/ChunkGroupWriterImpl.java | 77 +- .../tsfile/write/chunk/VectorChunkWriterImpl.java | 5 +- .../tsfile/write/schema/IMeasurementSchema.java | 5 + .../tsfile/write/schema/MeasurementSchema.java | 12 +- .../apache/iotdb/tsfile/write/schema/Schema.java | 28 +- .../write/schema/VectorMeasurementSchema.java | 12 +- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 10 + ...ateTest.java => DefaultSchemaTemplateTest.java} | 8 +- .../tsfile/write/MetadataIndexConstructorTest.java | 478 ++++++++++ .../write/schema/converter/SchemaBuilderTest.java | 4 +- .../write/writer/VectorChunkWriterImplTest.java | 34 +- .../write/writer/VectorMeasurementSchemaStub.java | 12 +- 346 files changed, 12967 insertions(+), 7924 deletions(-) diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java index cec70b2,adc4661..4fcb5a5 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java @@@ -22,33 -22,28 +22,40 @@@ import org.apache.iotdb.cluster.client. import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor; import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; ++import org.apache.iotdb.cluster.coordinator.Coordinator; import org.apache.iotdb.cluster.exception.ConfigInconsistentException; import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; ++import org.apache.iotdb.cluster.metadata.CMManager; ++import org.apache.iotdb.cluster.metadata.MetaPuller; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotStrategy; import org.apache.iotdb.cluster.rpc.thrift.Node; -import org.apache.iotdb.cluster.server.MetaClusterServer; +import org.apache.iotdb.cluster.server.ClusterRPCService; - import org.apache.iotdb.cluster.server.MetaClusterServer; ++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl; import org.apache.iotdb.cluster.server.Response; import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer; ++import org.apache.iotdb.cluster.server.member.MetaGroupMember; ++import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService; ++import org.apache.iotdb.cluster.server.raft.MetaRaftService; ++import org.apache.iotdb.cluster.server.service.MetaAsyncService; ++import org.apache.iotdb.cluster.server.service.MetaSyncService; import org.apache.iotdb.cluster.utils.ClusterUtils; +import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor; import org.apache.iotdb.db.conf.IoTDBConfigCheck; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.service.JMXService; +import org.apache.iotdb.db.service.RegisterManager; ++import org.apache.iotdb.db.service.thrift.ThriftServiceThread; import org.apache.iotdb.db.utils.TestOnly; -- import org.apache.thrift.TException; import org.apache.thrift.async.TAsyncClientManager; import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocolFactory; --import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -58,12 -53,9 +65,13 @@@ import java.util.Set import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP; - //we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB. -public class ClusterMain { ++// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB. +public class ClusterIoTDB { - private static final Logger logger = LoggerFactory.getLogger(ClusterMain.class); + private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class); + private final String mbeanName = - String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "ClusterIoTDB"); ++ String.format( ++ "%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "ClusterIoTDB"); // establish the cluster as a seed private static final String MODE_START = "-s"; @@@ -73,13 -65,8 +81,27 @@@ // metaport-of-removed-node private static final String MODE_REMOVE = "-r"; - private MetaClusterServer metaServer; - private static MetaClusterServer metaServer; ++ private MetaGroupMember metaGroupEngine; ++ private Node thisNode; ++ private Coordinator coordinator; + + private IoTDB iotdb = IoTDB.getInstance(); + + // Cluster IoTDB uses a individual registerManager with its parent. + private RegisterManager registerManager = new RegisterManager(); + ++ private ClusterIoTDB() { ++ ClusterConfig config = ClusterDescriptor.getInstance().getConfig(); ++ thisNode = new Node(); ++ // set internal rpc ip and ports ++ thisNode.setInternalIp(config.getInternalIp()); ++ thisNode.setMetaPort(config.getInternalMetaPort()); ++ thisNode.setDataPort(config.getInternalDataPort()); ++ // set client rpc ip and ports ++ thisNode.setClientPort(config.getClusterRpcPort()); ++ thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress()); ++ } + public static void main(String[] args) { if (args.length < 1) { logger.error( @@@ -115,18 -103,50 +137,18 @@@ String mode = args[0]; logger.info("Running mode {}", mode); + ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE; - //we start IoTDB kernel first. - cluster.iotdb.active(); ++ // we start IoTDB kernel first. ++ // cluster.iotdb.active(); + - //then we start the cluster module. ++ // then we start the cluster module. if (MODE_START.equals(mode)) { - try { - metaServer = new MetaClusterServer(); - startServerCheck(); - preStartCustomize(); - metaServer.start(); - metaServer.buildCluster(); - // Currently, we do not register ClusterInfoService as a JMX Bean, - // so we use startService() rather than start() - ClusterInfoServer.getInstance().startService(); - } catch (TTransportException - | StartupException - | QueryProcessException - | StartUpCheckFailureException - | ConfigInconsistentException e) { - metaServer.stop(); - logger.error("Fail to start meta server", e); - } + cluster.activeStartNodeMode(); } else if (MODE_ADD.equals(mode)) { - try { - long startTime = System.currentTimeMillis(); - metaServer = new MetaClusterServer(); - preStartCustomize(); - metaServer.start(); - metaServer.joinCluster(); - // Currently, we do not register ClusterInfoService as a JMX Bean, - // so we use startService() rather than start() - ClusterInfoServer.getInstance().startService(); - - logger.info( - "Adding this node {} to cluster costs {} ms", - metaServer.getMember().getThisNode(), - (System.currentTimeMillis() - startTime)); - } catch (TTransportException - | StartupException - | QueryProcessException - | StartUpCheckFailureException - | ConfigInconsistentException e) { - metaServer.stop(); - logger.error("Fail to join cluster", e); - } + cluster.activeAddNodeMode(); } else if (MODE_REMOVE.equals(mode)) { try { - doRemoveNode(args); + cluster.doRemoveNode(args); } catch (IOException e) { logger.error("Fail to remove node in cluster", e); } @@@ -135,61 -155,7 +157,94 @@@ } } - private static void startServerCheck() throws StartupException { + public void activeStartNodeMode() { + try { - metaServer = new MetaClusterServer(); + startServerCheck(); + preStartCustomize(); - metaServer.start(); - metaServer.buildCluster(); ++ ++ coordinator = new Coordinator(); ++ // register MetaGroupMember. MetaGroupMember has the same position with "StorageEngine" in the ++ // cluster moduel. ++ // TODO fixme it is better to remove coordinator out of metaGroupEngine ++ ++ // local engine ++ metaGroupEngine = ++ new MetaGroupMember( ++ ThriftServiceThread.getProtocolFactory( ++ IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()), ++ thisNode, ++ coordinator); ++ IoTDB.setMetaManager(CMManager.getInstance()); ++ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine); ++ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator); ++ MetaPuller.getInstance().init(metaGroupEngine); ++ iotdb.active(); ++ ++ registerManager.register(metaGroupEngine); ++ ++ metaGroupEngine.buildCluster(); ++ ++ // rpc service ++ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { ++ MetaAsyncService metaAsyncService = new MetaAsyncService(metaGroupEngine); ++ MetaRaftHeartBeatService.getInstance().initAsyncedServiceImpl(metaAsyncService); ++ MetaRaftService.getInstance().initAsyncedServiceImpl(metaAsyncService); ++ } else { ++ MetaSyncService syncService = new MetaSyncService(metaGroupEngine); ++ MetaRaftHeartBeatService.getInstance().initSyncedServiceImpl(syncService); ++ MetaRaftService.getInstance().initSyncedServiceImpl(syncService); ++ } ++ ++ // meta group heart beat rpc ++ registerManager.register(MetaRaftHeartBeatService.getInstance()); ++ registerManager.register(MetaRaftService.getInstance()); ++ + // Currently, we do not register ClusterInfoService as a JMX Bean, + // so we use startService() rather than start() + ClusterInfoServer.getInstance().startService(); + // JMX based DBA API + registerManager.register(ClusterMonitor.INSTANCE); + // we must wait until the metaGroup established. + // So that the ClusterRPCService can work. + registerManager.register(ClusterRPCService.getInstance()); - } catch (TTransportException - | StartupException ++ } catch (StartupException + | QueryProcessException + | StartUpCheckFailureException + | ConfigInconsistentException e) { + stop(); + logger.error("Fail to start meta server", e); + } + } + + public void activeAddNodeMode() { - try { - long startTime = System.currentTimeMillis(); - metaServer = new MetaClusterServer(); - preStartCustomize(); - metaServer.start(); - metaServer.joinCluster(); - // Currently, we do not register ClusterInfoService as a JMX Bean, - // so we use startService() rather than start() - ClusterInfoServer.getInstance().startService(); - // JMX based DBA API - registerManager.register(ClusterMonitor.INSTANCE); - //finally, we start the RPC service - registerManager.register(ClusterRPCService.getInstance()); - logger.info( - "Adding this node {} to cluster costs {} ms", - metaServer.getMember().getThisNode(), - (System.currentTimeMillis() - startTime)); - } catch (TTransportException - | StartupException - | QueryProcessException - | StartUpCheckFailureException - | ConfigInconsistentException e) { - stop(); - logger.error("Fail to join cluster", e); - } ++ // try { ++ // long startTime = System.currentTimeMillis(); ++ // metaServer = new RaftTSMetaServiceImpl(); ++ // preStartCustomize(); ++ // metaServer.start(); ++ // metaServer.joinCluster(); ++ // // Currently, we do not register ClusterInfoService as a JMX Bean, ++ // // so we use startService() rather than start() ++ // ClusterInfoServer.getInstance().startService(); ++ // // JMX based DBA API ++ // registerManager.register(ClusterMonitor.INSTANCE); ++ // // finally, we start the RPC service ++ // registerManager.register(ClusterRPCService.getInstance()); ++ // logger.info( ++ // "Adding this node {} to cluster costs {} ms", ++ // metaServer.getMember().getThisNode(), ++ // (System.currentTimeMillis() - startTime)); ++ // } catch (TTransportException ++ // | StartupException ++ // | QueryProcessException ++ // | StartUpCheckFailureException ++ // | ConfigInconsistentException e) { ++ // stop(); ++ // logger.error("Fail to join cluster", e); ++ // } + } + - + private void startServerCheck() throws StartupException { ClusterConfig config = ClusterDescriptor.getInstance().getConfig(); // check the initial replicateNum and refuse to start when the replicateNum <= 0 if (config.getReplicationNum() <= 0) { @@@ -236,18 -202,18 +291,12 @@@ } // assert this node is in seed nodes list -- Node localNode = new Node(); -- localNode -- .setInternalIp(config.getInternalIp()) -- .setMetaPort(config.getInternalMetaPort()) -- .setDataPort(config.getInternalDataPort()) -- .setClientPort(config.getClusterRpcPort()) -- .setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress()); -- if (!seedNodes.contains(localNode)) { ++ ++ if (!seedNodes.contains(thisNode)) { String message = String.format( "SeedNodes must contains local node in start-server mode. LocalNode: %s ,SeedNodes: %s", -- localNode.toString(), config.getSeedNodeUrls()); ++ thisNode.toString(), config.getSeedNodeUrls()); throw new StartupException(metaServer.getMember().getName(), message); } } @@@ -307,7 -273,7 +356,7 @@@ } } - public MetaClusterServer getMetaServer() { - public static MetaClusterServer getMetaServer() { ++ public RaftTSMetaServiceImpl getMetaServer() { return metaServer; } @@@ -358,35 -324,8 +407,33 @@@ }); } - - @TestOnly - public void setMetaClusterServer(MetaClusterServer metaClusterServer) { - public static void setMetaClusterServer(MetaClusterServer metaClusterServer) { -- metaServer = metaClusterServer; ++ public void setMetaClusterServer(RaftTSMetaServiceImpl RaftTSMetaServiceImpl) { ++ metaServer = RaftTSMetaServiceImpl; + } + + public void stop() { + deactivate(); + } + + private void deactivate() { + logger.info("Deactivating Cluster IoTDB..."); + metaServer.stop(); + registerManager.deregisterAll(); + JMXService.deregisterMBean(mbeanName); + logger.info("ClusterIoTDB is deactivated."); - //stop the iotdb kernel ++ // stop the iotdb kernel + iotdb.stop(); + } + - + public static ClusterIoTDB getInstance() { + return ClusterIoTDBHolder.INSTANCE; + } ++ + private static class ClusterIoTDBHolder { + + private static final ClusterIoTDB INSTANCE = new ClusterIoTDB(); + + private ClusterIoTDBHolder() {} } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java index b377b1a,b377b1a..f5b0a49 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java @@@ -19,6 -19,6 +19,7 @@@ package org.apache.iotdb.cluster.config; import org.apache.iotdb.cluster.rpc.thrift.Node; ++import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.db.utils.TestOnly; public class ClusterConstant { @@@ -67,4 -67,4 +68,55 @@@ public static void setElectionRandomTimeOutMs(long electionRandomTimeOutMs) { ClusterConstant.electionRandomTimeOutMs = electionRandomTimeOutMs; } ++ ++ private static int connectionTimeoutInMS = ++ ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS(); ++ private static int readOperationTimeoutMS = ++ ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS(); ++ private static int writeOperationTimeoutMS = ++ ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(); ++ private static int syncLeaderMaxWaitMs = 20 * 1000; ++ private static long heartBeatIntervalMs = 1000L; ++ ++ public static int getConnectionTimeoutInMS() { ++ return connectionTimeoutInMS; ++ } ++ ++ public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) { ++ ClusterConstant.connectionTimeoutInMS = connectionTimeoutInMS; ++ } ++ ++ public static int getReadOperationTimeoutMS() { ++ return readOperationTimeoutMS; ++ } ++ ++ public static int getWriteOperationTimeoutMS() { ++ return writeOperationTimeoutMS; ++ } ++ ++ public static int getSyncLeaderMaxWaitMs() { ++ return syncLeaderMaxWaitMs; ++ } ++ ++ public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) { ++ ClusterConstant.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs; ++ } ++ ++ public static long getHeartBeatIntervalMs() { ++ return heartBeatIntervalMs; ++ } ++ ++ public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) { ++ ClusterConstant.heartBeatIntervalMs = heartBeatIntervalMs; ++ } ++ ++ @TestOnly ++ public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) { ++ RaftServer.readOperationTimeoutMS = readOperationTimeoutMS; ++ } ++ ++ @TestOnly ++ public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) { ++ RaftServer.writeOperationTimeoutMS = writeOperationTimeoutMS; ++ } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java index 8c79ac5,5bbddec..b8a2c00 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java @@@ -33,15 -26,21 +33,15 @@@ import org.apache.iotdb.db.service.thri import org.apache.iotdb.db.service.thrift.ThriftServiceThread; import org.apache.iotdb.service.rpc.thrift.TSIService.Processor; - public class ClusterRPCService extends ThriftService implements ClusterRPCServiceMBean { -/** A service to handle jdbc request from client. */ -public class RPCService extends ThriftService implements RPCServiceMBean { - - private TSServiceImpl impl; ++public class ClusterRPCService extends ThriftService implements ClusterRPCServiceMBean { - private RPCService() {} + private ClusterTSServiceImpl impl; - public static RPCService getInstance() { - return RPCServiceHolder.INSTANCE; - } + private ClusterRPCService() {} @Override - public int getRPCPort() { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - return config.getRpcPort(); + public ThriftService getImplementation() { + return ClusterRPCServiceHolder.INSTANCE; } @Override @@@ -50,9 -49,12 +50,13 @@@ } @Override -- public void initTProcessor() - throws IllegalAccessException, InstantiationException { - impl = new ClusterTSServiceImpl(); - throws ClassNotFoundException, IllegalAccessException, InstantiationException { - impl = - (TSServiceImpl) - Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName()) - .newInstance(); ++ public void initTProcessor() throws IllegalAccessException, InstantiationException { ++ try { ++ impl = new ClusterTSServiceImpl(); ++ initSyncedServiceImpl(null); ++ } catch (QueryProcessException e) { ++ throw new InstantiationException(e.getMessage()); ++ } processor = new Processor<>(impl); } @@@ -64,9 -66,9 +68,9 @@@ new ThriftServiceThread( processor, getID().getName(), - ThreadName.RPC_CLIENT.getName(), - config.getRpcAddress(), - config.getRpcPort(), + ThreadName.CLUSTER_RPC_CLIENT.getName(), - config.getRpcAddress(), - config.getRpcPort(), ++ getBindIP(), ++ getBindPort(), config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), new RPCServiceThriftHandler(impl), @@@ -88,27 -90,14 +92,26 @@@ } @Override - public ServiceType getID() { - return ServiceType.RPC_SERVICE; + public int getRPCPort() { + return getBindPort(); + } + + public static ClusterRPCService getInstance() { + return ClusterRPCServiceHolder.INSTANCE; + } + + public void assignExecutorToServiceImpl(MetaGroupMember member) throws QueryProcessException { + this.impl.setExecutor(member); + } + + public void assignCoordinator(Coordinator coordinator) { + this.impl.setCoordinator(coordinator); } - - private static class RPCServiceHolder { + private static class ClusterRPCServiceHolder { - private static final RPCService INSTANCE = new RPCService(); + private static final ClusterRPCService INSTANCE = new ClusterRPCService(); - private RPCServiceHolder() {} + private ClusterRPCServiceHolder() {} } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java index e43e503,0000000..f1fd75a mode 100644,000000..100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java @@@ -1,237 -1,0 +1,161 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.server; + +import org.apache.iotdb.cluster.client.async.AsyncDataClient; +import org.apache.iotdb.cluster.client.sync.SyncDataClient; ++import org.apache.iotdb.cluster.config.ClusterConstant; +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.coordinator.Coordinator; - import org.apache.iotdb.cluster.metadata.CMManager; +import org.apache.iotdb.cluster.query.ClusterPlanExecutor; - import org.apache.iotdb.cluster.query.ClusterPlanner; +import org.apache.iotdb.cluster.query.RemoteQueryContext; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; +import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; +import org.apache.iotdb.cluster.server.member.MetaGroupMember; +import org.apache.iotdb.db.exception.StorageEngineException; - import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.query.QueryProcessException; - import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.query.context.QueryContext; - import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.service.TSServiceImpl; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSStatus; - import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; - +import org.apache.thrift.TException; - import org.apache.thrift.protocol.TProtocol; - import org.apache.thrift.server.ServerContext; - import org.apache.thrift.server.TServerEventHandler; - import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; - import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/** - * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the processing of - * the user requests (sqls and session api). It inherits the basic procedures from TSServiceImpl, - * but redirect the queries of data and metadata to a MetaGroupMember of the local node. ++ * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the ++ * processing of the user requests (sqls and session api). It inherits the basic procedures from ++ * TSServiceImpl, but redirect the queries of data and metadata to a MetaGroupMember of the local ++ * node. + */ +public class ClusterTSServiceImpl extends TSServiceImpl { + + private static final Logger logger = LoggerFactory.getLogger(ClusterTSServiceImpl.class); + /** - * The Coordinator of the local node. Through this node queries data and meta from - * the cluster and performs data manipulations to the cluster. ++ * The Coordinator of the local node. Through this node queries data and meta from the cluster and ++ * performs data manipulations to the cluster. + */ + private Coordinator coordinator; + - - // /** - // * Using the poolServer, ClusterTSServiceImpl will listen to a socket to accept thrift requests like an - // * HttpServer. - // */ - // private TServer poolServer; - // - // /** The socket poolServer will listen to. Async service requires nonblocking socket */ - // private TServerTransport serverTransport; - + /** + * queryId -> queryContext map. When a query ends either normally or accidentally, the resources + * used by the query can be found in the context and then released. + */ + private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>(); + - public ClusterTSServiceImpl(MetaGroupMember metaGroupMember) throws QueryProcessException { - super(); - this.processor = new ClusterPlanner(); - } ++ public ClusterTSServiceImpl() throws QueryProcessException {} + + public void setExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException { - this.executor = new ClusterPlanExecutor(metaGroupMember); ++ executor = new ClusterPlanExecutor(metaGroupMember); + } + + public void setCoordinator(Coordinator coordinator) { + this.coordinator = coordinator; + } + - + /** + * Redirect the plan to the local Coordinator so that it will be processed cluster-wide. + * + * @param plan + * @return + */ + @Override + protected TSStatus executeNonQueryPlan(PhysicalPlan plan) { + try { + plan.checkIntegrity(); + } catch (QueryProcessException e) { + logger.warn("Illegal plan detected: {}", plan); + return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + + return coordinator.executeNonQueryPlan(plan); + } + + /** - * EventHandler handles the preprocess and postprocess of the thrift requests, but it currently - * only release resources when a client disconnects. - */ - class EventHandler implements TServerEventHandler { - - @Override - public void preServe() { - // do nothing - } - - @Override - public ServerContext createContext(TProtocol input, TProtocol output) { - return null; - } - - @Override - public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { - ClusterTSServiceImpl.this.handleClientExit(); - } - - @Override - public void processContext( - ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) { - // do nothing - } - } - - /** - * Get the data types of each path in “paths”. If "aggregations" is not null, then it should be - * corresponding to "paths" one to one and the data type will be the type of the aggregation over - * the corresponding path. - * - * @param paths full timeseries paths - * @param aggregations if not null, it should be the same size as "paths" - * @return the data types of "paths" (using the aggregations) - * @throws MetadataException - */ - @Override - protected List<TSDataType> getSeriesTypesByPaths( - List<PartialPath> paths, List<String> aggregations) throws MetadataException { - return ((CMManager) IoTDB.metaManager).getSeriesTypesByPath(paths, aggregations).left; - } - - /** - * Get the data types of each path in “paths”. If "aggregation" is not null, all "paths" will use - * this aggregation. - * - * @param paths full timeseries paths - * @param aggregation if not null, it means "paths" all use this aggregation - * @return the data types of "paths" (using the aggregation) - * @throws MetadataException - */ - protected List<TSDataType> getSeriesTypesByString(List<PartialPath> paths, String aggregation) - throws MetadataException { - return ((CMManager) IoTDB.metaManager).getSeriesTypesByPaths(paths, aggregation).left; - } - - /** + * Generate and cache a QueryContext using "queryId". In the distributed version, the QueryContext + * is a RemoteQueryContext. + * + * @param queryId + * @return a RemoteQueryContext using queryId + */ + @Override + protected QueryContext genQueryContext(long queryId, boolean debug) { + RemoteQueryContext context = new RemoteQueryContext(queryId, debug); + queryContextMap.put(queryId, context); + return context; + } + + /** + * Release the local and remote resources used by a query. + * + * @param queryId + * @throws StorageEngineException + */ + @Override + protected void releaseQueryResource(long queryId) throws StorageEngineException { + // release resources locally + super.releaseQueryResource(queryId); + // release resources remotely + RemoteQueryContext context = queryContextMap.remove(queryId); + if (context != null) { + // release the resources in every queried node + for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) { + RaftNode header = headerEntry.getKey(); + Set<Node> queriedNodes = headerEntry.getValue(); + + for (Node queriedNode : queriedNodes) { + GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>()); + try { + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + AsyncDataClient client = + coordinator.getAsyncDataClient( - queriedNode, RaftServer.getReadOperationTimeoutMS()); ++ queriedNode, ClusterConstant.getReadOperationTimeoutMS()); + client.endQuery(header, coordinator.getThisNode(), queryId, handler); + } else { + try (SyncDataClient syncDataClient = + coordinator.getSyncDataClient( - queriedNode, RaftServer.getReadOperationTimeoutMS())) { - syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); ++ queriedNode, ClusterConstant.getReadOperationTimeoutMS())) { ++ try { ++ syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); ++ } catch (TException e) { ++ // the connection may be broken, close it to avoid it being reused ++ syncDataClient.getInputProtocol().getTransport().close(); ++ throw e; ++ } + } + } + } catch (IOException | TException e) { + logger.error("Cannot end query {} in {}", queryId, queriedNode); + } + } + } + } + } +} diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index b0f8a25,fce7a87..0000000 deleted file mode 100644,100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ /dev/null @@@ -1,369 -1,378 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -- */ --package org.apache.iotdb.cluster.server; -- - import org.apache.iotdb.cluster.ClusterIoTDB; --import org.apache.iotdb.cluster.config.ClusterDescriptor; --import org.apache.iotdb.cluster.coordinator.Coordinator; --import org.apache.iotdb.cluster.exception.ConfigInconsistentException; --import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; --import org.apache.iotdb.cluster.metadata.CMManager; --import org.apache.iotdb.cluster.metadata.MetaPuller; --import org.apache.iotdb.cluster.rpc.thrift.*; --import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor; --import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor; --import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer; --import org.apache.iotdb.cluster.server.member.MetaGroupMember; --import org.apache.iotdb.cluster.server.service.MetaAsyncService; --import org.apache.iotdb.cluster.server.service.MetaSyncService; --import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor; --import org.apache.iotdb.db.exception.StartupException; --import org.apache.iotdb.db.exception.query.QueryProcessException; --import org.apache.iotdb.db.service.IoTDB; --import org.apache.iotdb.db.service.RegisterManager; --import org.apache.iotdb.db.utils.TestOnly; --import org.apache.iotdb.service.rpc.thrift.TSStatus; -- --import org.apache.thrift.TException; --import org.apache.thrift.TProcessor; --import org.apache.thrift.async.AsyncMethodCallback; --import org.apache.thrift.transport.TNonblockingServerSocket; --import org.apache.thrift.transport.TServerSocket; --import org.apache.thrift.transport.TServerTransport; --import org.apache.thrift.transport.TTransportException; --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; -- --import java.net.InetSocketAddress; --import java.nio.ByteBuffer; -- --/** -- * MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the -- * data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is -- * started-up at the same time. -- */ --public class MetaClusterServer extends RaftServer -- implements TSMetaService.AsyncIface, TSMetaService.Iface { -- private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class); -- -- // each node only contains one MetaGroupMember -- private MetaGroupMember member; -- private Coordinator coordinator; - - - // the single-node IoTDB instance - private IoTDB ioTDB; - // to register the ClusterMonitor that helps monitoring the cluster - private RegisterManager registerManager = new RegisterManager(); -- private MetaAsyncService asyncService; -- private MetaSyncService syncService; -- private MetaHeartbeatServer metaHeartbeatServer; -- -- public MetaClusterServer() throws QueryProcessException { -- super(); -- metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this); -- coordinator = new Coordinator(); -- member = new MetaGroupMember(protocolFactory, thisNode, coordinator); -- coordinator.setMetaGroupMember(member); -- asyncService = new MetaAsyncService(member); -- syncService = new MetaSyncService(member); -- MetaPuller.getInstance().init(member); -- } -- -- /** -- * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the -- * ClusterMonitor are also started. -- * -- * @throws TTransportException -- * @throws StartupException -- */ -- @Override -- public void start() throws TTransportException, StartupException { -- super.start(); -- metaHeartbeatServer.start(); - - ioTDB = new IoTDB(); -- IoTDB.setMetaManager(CMManager.getInstance()); -- ((CMManager) IoTDB.metaManager).setMetaGroupMember(member); -- ((CMManager) IoTDB.metaManager).setCoordinator(coordinator); - //TODO FIXME move this out of MetaClusterServer - IoTDB.getInstance().active(); - - ioTDB.active(); -- member.start(); - - // JMX based DBA API - registerManager.register(ClusterMonitor.INSTANCE); -- } -- -- /** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */ -- @Override -- public void stop() { - - if (ioTDB == null) { - return; - } -- metaHeartbeatServer.stop(); -- super.stop(); - ioTDB.stop(); - ioTDB = null; -- member.stop(); - registerManager.deregisterAll(); -- } -- -- /** Build a initial cluster with other nodes on the seed list. */ -- public void buildCluster() throws ConfigInconsistentException, StartUpCheckFailureException { -- member.buildCluster(); -- } -- -- /** -- * Pick up a node from the seed list and send a join request to it. -- * -- * @return whether the node has joined the cluster. -- */ -- public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException { -- member.joinCluster(); -- } -- -- /** -- * MetaClusterServer uses the meta port to create the socket. -- * -- * @return the TServerTransport -- * @throws TTransportException if create the socket fails -- */ -- @Override -- TServerTransport getServerSocket() throws TTransportException { -- logger.info( -- "[{}] Cluster node will listen {}:{}", -- getServerClientName(), -- config.getInternalIp(), -- config.getInternalMetaPort()); -- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { -- return new TNonblockingServerSocket( -- new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()), -- getConnectionTimeoutInMS()); -- } else { -- return new TServerSocket( -- new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort())); -- } -- } -- -- @Override -- String getClientThreadPrefix() { -- return "MetaClientThread-"; -- } -- -- @Override -- String getServerClientName() { -- return "MetaServerThread-"; -- } -- -- @Override -- TProcessor getProcessor() { -- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { -- return new AsyncProcessor<>(this); -- } else { -- return new Processor<>(this); -- } -- } -- -- // Request forwarding. There is only one MetaGroupMember each node, so all requests will be -- // directly sent to that member. See the methods in MetaGroupMember for details -- -- @Override -- public void addNode(Node node, StartUpStatus startUpStatus, AsyncMethodCallback resultHandler) { -- asyncService.addNode(node, startUpStatus, resultHandler); -- } -- -- @Override -- public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) { -- asyncService.sendHeartbeat(request, resultHandler); -- } -- -- @Override -- public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) { -- asyncService.startElection(electionRequest, resultHandler); -- } -- -- @Override -- public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler) { -- asyncService.appendEntries(request, resultHandler); -- } -- -- @Override -- public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) { -- asyncService.appendEntry(request, resultHandler); -- } -- -- @Override -- public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback resultHandler) { -- asyncService.sendSnapshot(request, resultHandler); -- } -- -- @Override -- public void executeNonQueryPlan( -- ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) { -- asyncService.executeNonQueryPlan(request, resultHandler); -- } -- -- @Override -- public void requestCommitIndex( -- RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) { -- asyncService.requestCommitIndex(header, resultHandler); -- } -- -- @Override -- public void checkAlive(AsyncMethodCallback<Node> resultHandler) { -- asyncService.checkAlive(resultHandler); -- } -- -- @Override -- public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) { -- asyncService.collectMigrationStatus(resultHandler); -- } -- -- @Override -- public void readFile( -- String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) { -- asyncService.readFile(filePath, offset, length, resultHandler); -- } -- -- @Override -- public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) { -- asyncService.queryNodeStatus(resultHandler); -- } -- -- public MetaGroupMember getMember() { -- return member; -- } -- -- @Override -- public void checkStatus( -- StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> resultHandler) { -- asyncService.checkStatus(startUpStatus, resultHandler); -- } -- -- @Override -- public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) { -- asyncService.removeNode(node, resultHandler); -- } -- -- @Override -- public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) { -- asyncService.exile(removeNodeLog, resultHandler); -- } -- -- @Override -- public void matchTerm( -- long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) { -- asyncService.matchTerm(index, term, header, resultHandler); -- } -- -- @Override -- public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException { -- return syncService.addNode(node, startUpStatus); -- } -- -- @Override -- public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) { -- return syncService.checkStatus(startUpStatus); -- } -- -- @Override -- public long removeNode(Node node) throws TException { -- return syncService.removeNode(node); -- } -- -- @Override -- public void exile(ByteBuffer removeNodeLog) { -- syncService.exile(removeNodeLog); -- } -- -- @Override -- public TNodeStatus queryNodeStatus() { -- return syncService.queryNodeStatus(); -- } -- -- @Override -- public Node checkAlive() { -- return syncService.checkAlive(); -- } -- -- @Override -- public ByteBuffer collectMigrationStatus() { -- return syncService.collectMigrationStatus(); -- } -- -- @Override -- public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) { -- return syncService.sendHeartbeat(request); -- } -- -- @Override -- public long startElection(ElectionRequest request) { -- return syncService.startElection(request); -- } -- -- @Override -- public long appendEntries(AppendEntriesRequest request) throws TException { -- return syncService.appendEntries(request); -- } -- -- @Override -- public long appendEntry(AppendEntryRequest request) throws TException { -- return syncService.appendEntry(request); -- } -- -- @Override -- public void sendSnapshot(SendSnapshotRequest request) throws TException { -- syncService.sendSnapshot(request); -- } -- -- @Override -- public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException { -- return syncService.executeNonQueryPlan(request); -- } -- -- @Override -- public RequestCommitIndexResponse requestCommitIndex(RaftNode header) throws TException { -- return syncService.requestCommitIndex(header); -- } -- -- @Override -- public ByteBuffer readFile(String filePath, long offset, int length) throws TException { -- return syncService.readFile(filePath, offset, length); -- } -- -- @Override -- public boolean matchTerm(long index, long term, RaftNode header) { -- return syncService.matchTerm(index, term, header); -- } -- -- @Override -- public void removeHardLink(String hardLinkPath) throws TException { -- syncService.removeHardLink(hardLinkPath); -- } -- -- @Override -- public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) { -- asyncService.removeHardLink(hardLinkPath, resultHandler); -- } -- -- @Override -- public void handshake(Node sender) { -- syncService.handshake(sender); -- } -- -- @Override -- public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) { -- asyncService.handshake(sender, resultHandler); -- } -- -- @TestOnly -- public void setMetaGroupMember(MetaGroupMember metaGroupMember) { -- this.member = metaGroupMember; -- } -- - @TestOnly - public IoTDB getIoTDB() { - return ioTDB; - } --} diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java index 0000000,0000000..65db372 new file mode 100644 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java @@@ -1,0 -1,0 +1,372 @@@ ++/// * ++// * Licensed to the Apache Software Foundation (ASF) under one ++// * or more contributor license agreements. See the NOTICE file ++// * distributed with this work for additional information ++// * regarding copyright ownership. The ASF licenses this file ++// * to you under the Apache License, Version 2.0 (the ++// * "License"); you may not use this file except in compliance ++// * with the License. You may obtain a copy of the License at ++// * ++// * http://www.apache.org/licenses/LICENSE-2.0 ++// * ++// * Unless required by applicable law or agreed to in writing, ++// * software distributed under the License is distributed on an ++// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++// * KIND, either express or implied. See the License for the ++// * specific language governing permissions and limitations ++// * under the License. ++// */ ++// package org.apache.iotdb.cluster.server; ++// ++// import org.apache.iotdb.cluster.config.ClusterDescriptor; ++// import org.apache.iotdb.cluster.coordinator.Coordinator; ++// import org.apache.iotdb.cluster.exception.ConfigInconsistentException; ++// import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; ++// import org.apache.iotdb.cluster.metadata.CMManager; ++// import org.apache.iotdb.cluster.metadata.MetaPuller; ++// import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; ++// import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; ++// import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; ++// import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; ++// import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; ++// import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; ++// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; ++// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; ++// import org.apache.iotdb.cluster.rpc.thrift.Node; ++// import org.apache.iotdb.cluster.rpc.thrift.RaftNode; ++// import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; ++// import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; ++// import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus; ++// import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus; ++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService; ++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor; ++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor; ++// import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer; ++// import org.apache.iotdb.cluster.server.member.MetaGroupMember; ++// import org.apache.iotdb.cluster.server.service.MetaAsyncService; ++// import org.apache.iotdb.cluster.server.service.MetaSyncService; ++// import org.apache.iotdb.db.exception.StartupException; ++// import org.apache.iotdb.db.exception.query.QueryProcessException; ++// import org.apache.iotdb.db.service.IoTDB; ++// import org.apache.iotdb.db.utils.TestOnly; ++// import org.apache.iotdb.service.rpc.thrift.TSStatus; ++// import org.apache.thrift.TException; ++// import org.apache.thrift.async.AsyncMethodCallback; ++// import org.apache.thrift.transport.TNonblockingServerSocket; ++// import org.slf4j.Logger; ++// import org.slf4j.LoggerFactory; ++// ++// import java.net.InetSocketAddress; ++// import java.nio.ByteBuffer; ++// ++/// ** ++// * MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the ++// * data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is ++// * started-up at the same time. ++// */ ++// public class MetaClusterServer2 extends RaftServer ++// implements TSMetaService.AsyncIface, TSMetaService.Iface { ++// private static Logger logger = LoggerFactory.getLogger(MetaClusterServer2.class); ++// ++// // each node only contains one MetaGroupMember ++// private MetaGroupMember member; ++// private Coordinator coordinator; ++// ++// private MetaAsyncService asyncService; ++// private MetaSyncService syncService; ++// private MetaHeartbeatServer metaHeartbeatServer; ++// ++// public MetaClusterServer2() throws QueryProcessException { ++// super(); ++// metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this); ++// coordinator = new Coordinator(); ++// member = new MetaGroupMember(protocolFactory, thisNode, coordinator); ++// coordinator.setMetaGroupMember(member); ++// asyncService = new MetaAsyncService(member); ++// syncService = new MetaSyncService(member); ++// MetaPuller.getInstance().init(member); ++// } ++// ++// /** ++// * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the ++// * ClusterMonitor are also started. ++// * ++// * @throws TTransportException ++// * @throws StartupException ++// */ ++// @Override ++// public void start() throws TTransportException, StartupException { ++// super.start(); ++// metaHeartbeatServer.start(); ++// ++// IoTDB.setMetaManager(CMManager.getInstance()); ++// ((CMManager) IoTDB.metaManager).setMetaGroupMember(member); ++// ((CMManager) IoTDB.metaManager).setCoordinator(coordinator); ++// // TODO FIXME move this out of MetaClusterServer ++// IoTDB.getInstance().active(); ++// ++// member.start(); ++// } ++// ++// /** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */ ++// @Override ++// public void stop() { ++// ++// metaHeartbeatServer.stop(); ++// super.stop(); ++// member.stop(); ++// } ++// ++// /** Build a initial cluster with other nodes on the seed list. */ ++// public void buildCluster() throws ConfigInconsistentException, StartUpCheckFailureException { ++// member.buildCluster(); ++// } ++// ++// /** ++// * Pick up a node from the seed list and send a join request to it. ++// * ++// * @return whether the node has joined the cluster. ++// */ ++// public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException { ++// member.joinCluster(); ++// } ++// ++// /** ++// * MetaClusterServer uses the meta port to create the socket. ++// * ++// * @return the TServerTransport ++// * @throws TTransportException if create the socket fails ++// */ ++// @Override ++// TServerTransport getServerSocket() throws TTransportException { ++// logger.info( ++// "[{}] Cluster node will listen {}:{}", ++// getServerClientName(), ++// config.getInternalIp(), ++// config.getInternalMetaPort()); ++// if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { ++// return new TNonblockingServerSocket( ++// new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()), ++// getConnectionTimeoutInMS()); ++// } else { ++// return new TServerSocket( ++// new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort())); ++// } ++// } ++// ++// @Override ++// String getClientThreadPrefix() { ++// return "MetaClientThread-"; ++// } ++// ++// @Override ++// String getServerClientName() { ++// return "MetaServerThread-"; ++// } ++// ++// @Override ++// TProcessor getProcessor() { ++// if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { ++// return new AsyncProcessor<>(this); ++// } else { ++// return new Processor<>(this); ++// } ++// } ++// ++// // Request forwarding. There is only one MetaGroupMember each node, so all requests will be ++// // directly sent to that member. See the methods in MetaGroupMember for details ++// ++// @Override ++// public void addNode(Node node, StartUpStatus startUpStatus, AsyncMethodCallback resultHandler) { ++// asyncService.addNode(node, startUpStatus, resultHandler); ++// } ++// ++// @Override ++// public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) { ++// asyncService.sendHeartbeat(request, resultHandler); ++// } ++// ++// @Override ++// public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) { ++// asyncService.startElection(electionRequest, resultHandler); ++// } ++// ++// @Override ++// public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler) { ++// asyncService.appendEntries(request, resultHandler); ++// } ++// ++// @Override ++// public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) { ++// asyncService.appendEntry(request, resultHandler); ++// } ++// ++// @Override ++// public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback resultHandler) { ++// asyncService.sendSnapshot(request, resultHandler); ++// } ++// ++// @Override ++// public void executeNonQueryPlan( ++// ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) { ++// asyncService.executeNonQueryPlan(request, resultHandler); ++// } ++// ++// @Override ++// public void requestCommitIndex( ++// RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) { ++// asyncService.requestCommitIndex(header, resultHandler); ++// } ++// ++// @Override ++// public void checkAlive(AsyncMethodCallback<Node> resultHandler) { ++// asyncService.checkAlive(resultHandler); ++// } ++// ++// @Override ++// public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) { ++// asyncService.collectMigrationStatus(resultHandler); ++// } ++// ++// @Override ++// public void readFile( ++// String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) { ++// asyncService.readFile(filePath, offset, length, resultHandler); ++// } ++// ++// @Override ++// public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) { ++// asyncService.queryNodeStatus(resultHandler); ++// } ++// ++// public MetaGroupMember getMember() { ++// return member; ++// } ++// ++// @Override ++// public void checkStatus( ++// StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> resultHandler) { ++// asyncService.checkStatus(startUpStatus, resultHandler); ++// } ++// ++// @Override ++// public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) { ++// asyncService.removeNode(node, resultHandler); ++// } ++// ++// @Override ++// public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) { ++// asyncService.exile(removeNodeLog, resultHandler); ++// } ++// ++// @Override ++// public void matchTerm( ++// long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) { ++// asyncService.matchTerm(index, term, header, resultHandler); ++// } ++// ++// @Override ++// public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException { ++// return syncService.addNode(node, startUpStatus); ++// } ++// ++// @Override ++// public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) { ++// return syncService.checkStatus(startUpStatus); ++// } ++// ++// @Override ++// public long removeNode(Node node) throws TException { ++// return syncService.removeNode(node); ++// } ++// ++// @Override ++// public void exile(ByteBuffer removeNodeLog) { ++// syncService.exile(removeNodeLog); ++// } ++// ++// @Override ++// public TNodeStatus queryNodeStatus() { ++// return syncService.queryNodeStatus(); ++// } ++// ++// @Override ++// public Node checkAlive() { ++// return syncService.checkAlive(); ++// } ++// ++// @Override ++// public ByteBuffer collectMigrationStatus() { ++// return syncService.collectMigrationStatus(); ++// } ++// ++// @Override ++// public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) { ++// return syncService.sendHeartbeat(request); ++// } ++// ++// @Override ++// public long startElection(ElectionRequest request) { ++// return syncService.startElection(request); ++// } ++// ++// @Override ++// public long appendEntries(AppendEntriesRequest request) throws TException { ++// return syncService.appendEntries(request); ++// } ++// ++// @Override ++// public long appendEntry(AppendEntryRequest request) throws TException { ++// return syncService.appendEntry(request); ++// } ++// ++// @Override ++// public void sendSnapshot(SendSnapshotRequest request) throws TException { ++// syncService.sendSnapshot(request); ++// } ++// ++// @Override ++// public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException { ++// return syncService.executeNonQueryPlan(request); ++// } ++// ++// @Override ++// public RequestCommitIndexResponse requestCommitIndex(RaftNode header) throws TException { ++// return syncService.requestCommitIndex(header); ++// } ++// ++// @Override ++// public ByteBuffer readFile(String filePath, long offset, int length) throws TException { ++// return syncService.readFile(filePath, offset, length); ++// } ++// ++// @Override ++// public boolean matchTerm(long index, long term, RaftNode header) { ++// return syncService.matchTerm(index, term, header); ++// } ++// ++// @Override ++// public void removeHardLink(String hardLinkPath) throws TException { ++// syncService.removeHardLink(hardLinkPath); ++// } ++// ++// @Override ++// public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) { ++// asyncService.removeHardLink(hardLinkPath, resultHandler); ++// } ++// ++// @Override ++// public void handshake(Node sender) { ++// syncService.handshake(sender); ++// } ++// ++// @Override ++// public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) { ++// asyncService.handshake(sender, resultHandler); ++// } ++// ++// @TestOnly ++// public void setMetaGroupMember(MetaGroupMember metaGroupMember) { ++// this.member = metaGroupMember; ++// } ++// } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java index 09956d2,09956d2..0000000 deleted file mode 100644,100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java +++ /dev/null @@@ -1,263 -1,263 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -- */ -- --package org.apache.iotdb.cluster.server; -- --import org.apache.iotdb.cluster.config.ClusterConfig; --import org.apache.iotdb.cluster.config.ClusterDescriptor; --import org.apache.iotdb.cluster.rpc.thrift.Node; --import org.apache.iotdb.cluster.rpc.thrift.RaftService; --import org.apache.iotdb.cluster.utils.ClusterUtils; --import org.apache.iotdb.db.conf.IoTDBDescriptor; --import org.apache.iotdb.db.exception.StartupException; --import org.apache.iotdb.db.utils.CommonUtils; --import org.apache.iotdb.db.utils.TestOnly; --import org.apache.iotdb.rpc.RpcTransportFactory; -- --import org.apache.thrift.TProcessor; --import org.apache.thrift.protocol.TBinaryProtocol; --import org.apache.thrift.protocol.TCompactProtocol; --import org.apache.thrift.protocol.TProtocolFactory; --import org.apache.thrift.server.TServer; --import org.apache.thrift.server.TThreadedSelectorServer; --import org.apache.thrift.transport.TNonblockingServerTransport; --import org.apache.thrift.transport.TServerTransport; --import org.apache.thrift.transport.TTransportException; --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; -- --import java.util.ConcurrentModificationException; --import java.util.concurrent.ExecutorService; --import java.util.concurrent.Executors; --import java.util.concurrent.SynchronousQueue; --import java.util.concurrent.ThreadFactory; --import java.util.concurrent.ThreadPoolExecutor; --import java.util.concurrent.atomic.AtomicLong; -- --/** -- * RaftServer works as a broker (network and protocol layer) that sends the requests to the proper -- * RaftMembers to process. -- */ --public abstract class RaftServer implements RaftService.AsyncIface, RaftService.Iface { -- -- private static final Logger logger = LoggerFactory.getLogger(RaftServer.class); -- private static int connectionTimeoutInMS = -- ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS(); -- private static int readOperationTimeoutMS = -- ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS(); -- private static int writeOperationTimeoutMS = -- ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(); -- private static int syncLeaderMaxWaitMs = 20 * 1000; -- private static long heartBeatIntervalMs = 1000L; -- -- ClusterConfig config = ClusterDescriptor.getInstance().getConfig(); -- // the socket poolServer will listen to -- private TServerTransport socket; -- // RPC processing server -- private TServer poolServer; -- Node thisNode; -- -- TProtocolFactory protocolFactory = -- config.isRpcThriftCompressionEnabled() -- ? new TCompactProtocol.Factory() -- : new TBinaryProtocol.Factory(); -- -- // this thread pool is to run the thrift server (poolServer above) -- private ExecutorService clientService; -- -- RaftServer() { -- thisNode = new Node(); -- // set internal rpc ip and ports -- thisNode.setInternalIp(config.getInternalIp()); -- thisNode.setMetaPort(config.getInternalMetaPort()); -- thisNode.setDataPort(config.getInternalDataPort()); -- // set client rpc ip and ports -- thisNode.setClientPort(config.getClusterRpcPort()); -- thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress()); -- } -- -- RaftServer(Node thisNode) { -- this.thisNode = thisNode; -- } -- -- public static int getConnectionTimeoutInMS() { -- return connectionTimeoutInMS; -- } -- -- public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) { -- RaftServer.connectionTimeoutInMS = connectionTimeoutInMS; -- } -- -- public static int getReadOperationTimeoutMS() { -- return readOperationTimeoutMS; -- } -- -- public static int getWriteOperationTimeoutMS() { -- return writeOperationTimeoutMS; -- } -- -- public static int getSyncLeaderMaxWaitMs() { -- return syncLeaderMaxWaitMs; -- } -- -- public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) { -- RaftServer.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs; -- } -- -- public static long getHeartBeatIntervalMs() { -- return heartBeatIntervalMs; -- } -- -- public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) { -- RaftServer.heartBeatIntervalMs = heartBeatIntervalMs; -- } -- -- /** -- * Establish a thrift server with the configurations in ClusterConfig to listen to and respond to -- * thrift RPCs. Calling the method twice does not induce side effects. -- * -- * @throws TTransportException -- */ -- @SuppressWarnings("java:S1130") // thrown in override method -- public void start() throws TTransportException, StartupException { -- if (poolServer != null) { -- return; -- } -- -- establishServer(); -- } -- -- /** -- * Stop the thrift server, close the socket and interrupt all in progress RPCs. Calling the method -- * twice does not induce side effects. -- */ -- public void stop() { -- if (poolServer == null) { -- return; -- } -- -- try { -- poolServer.stop(); -- } catch (ConcurrentModificationException e) { -- // ignore -- } -- socket.close(); -- clientService.shutdownNow(); -- socket = null; -- poolServer = null; -- } -- -- /** -- * @return An AsyncProcessor that contains the extended interfaces of a non-abstract subclass of -- * RaftService (DataService or MetaService). -- */ -- abstract TProcessor getProcessor(); -- -- /** -- * @return A socket that will be used to establish a thrift server to listen to RPC requests. -- * DataServer and MetaServer use different port, so this is to be determined. -- * @throws TTransportException -- */ -- abstract TServerTransport getServerSocket() throws TTransportException; -- -- /** -- * Each thrift RPC request will be processed in a separate thread and this will return the name -- * prefix of such threads. This is used to fast distinguish DataServer and MetaServer in the logs -- * for the sake of debug. -- * -- * @return name prefix of RPC processing threads. -- */ -- abstract String getClientThreadPrefix(); -- -- /** -- * The thrift server will be run in a separate thread, and this will be its name. It help you -- * locate the desired logs quickly when debugging. -- * -- * @return The name of the thread running the thrift server. -- */ -- abstract String getServerClientName(); -- -- private TServer createAsyncServer() throws TTransportException { -- socket = getServerSocket(); -- TThreadedSelectorServer.Args poolArgs = -- new TThreadedSelectorServer.Args((TNonblockingServerTransport) socket); -- poolArgs.maxReadBufferBytes = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize(); -- poolArgs.selectorThreads(CommonUtils.getCpuCores()); -- int maxConcurrentClientNum = -- Math.max(CommonUtils.getCpuCores(), config.getMaxConcurrentClientNum()); -- poolArgs.executorService( -- new ThreadPoolExecutor( -- CommonUtils.getCpuCores(), -- maxConcurrentClientNum, -- poolArgs.getStopTimeoutVal(), -- poolArgs.getStopTimeoutUnit(), -- new SynchronousQueue<>(), -- new ThreadFactory() { -- private AtomicLong threadIndex = new AtomicLong(0); -- -- @Override -- public Thread newThread(Runnable r) { -- return new Thread(r, getClientThreadPrefix() + threadIndex.incrementAndGet()); -- } -- })); -- poolArgs.processor(getProcessor()); -- poolArgs.protocolFactory(protocolFactory); -- // async service requires FramedTransport -- poolArgs.transportFactory(RpcTransportFactory.INSTANCE); -- -- // run the thrift server in a separate thread so that the main thread is not blocked -- return new TThreadedSelectorServer(poolArgs); -- } -- -- private TServer createSyncServer() throws TTransportException { -- socket = getServerSocket(); -- return ClusterUtils.createTThreadPoolServer( -- socket, getClientThreadPrefix(), getProcessor(), protocolFactory); -- } -- -- private void establishServer() throws TTransportException { -- logger.info( -- "[{}] Cluster node {} begins to set up with {} mode", -- getServerClientName(), -- thisNode, -- ClusterDescriptor.getInstance().getConfig().isUseAsyncServer() ? "Async" : "Sync"); -- -- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { -- poolServer = createAsyncServer(); -- } else { -- poolServer = createSyncServer(); -- } -- -- clientService = Executors.newSingleThreadExecutor(r -> new Thread(r, getServerClientName())); -- -- clientService.submit(() -> poolServer.serve()); -- -- logger.info("[{}] Cluster node {} is up", getServerClientName(), thisNode); -- } -- -- @TestOnly -- public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) { -- RaftServer.readOperationTimeoutMS = readOperationTimeoutMS; -- } -- -- @TestOnly -- public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) { -- RaftServer.writeOperationTimeoutMS = writeOperationTimeoutMS; -- } --} diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java index ed99c3d,ed99c3d..348cda0 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java @@@ -23,9 -23,9 +23,8 @@@ import org.apache.iotdb.cluster.config. import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor; import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor; --import org.apache.iotdb.cluster.server.MetaClusterServer; ++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl; import org.apache.iotdb.cluster.utils.ClusterUtils; -- import org.apache.thrift.TProcessor; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TServerSocket; @@@ -39,22 -39,22 +38,22 @@@ import java.net.InetSocketAddress public class MetaHeartbeatServer extends HeartbeatServer { private static Logger logger = LoggerFactory.getLogger(MetaHeartbeatServer.class); -- private MetaClusterServer metaClusterServer; ++ private RaftTSMetaServiceImpl RaftTSMetaServiceImpl; /** Do not use this method for initialization */ private MetaHeartbeatServer() {} -- public MetaHeartbeatServer(Node thisNode, MetaClusterServer metaClusterServer) { ++ public MetaHeartbeatServer(Node thisNode, RaftTSMetaServiceImpl RaftTSMetaServiceImpl) { super(thisNode); -- this.metaClusterServer = metaClusterServer; ++ this.RaftTSMetaServiceImpl = RaftTSMetaServiceImpl; } @Override TProcessor getProcessor() { if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { -- return new AsyncProcessor<>(metaClusterServer); ++ return new AsyncProcessor<>(RaftTSMetaServiceImpl); } else { -- return new Processor<>(metaClusterServer); ++ return new Processor<>(RaftTSMetaServiceImpl); } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 39ebf3c,f917916..62ce37e --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@@ -65,11 -65,11 +65,9 @@@ import org.apache.iotdb.cluster.rpc.thr import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus; import org.apache.iotdb.cluster.rpc.thrift.TSMetaService; import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient; - import org.apache.iotdb.cluster.server.ClusterTSServiceImpl; -import org.apache.iotdb.cluster.server.ClientServer; import org.apache.iotdb.cluster.server.DataClusterServer; import org.apache.iotdb.cluster.server.HardLinkCleaner; import org.apache.iotdb.cluster.server.NodeCharacter; --import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.Response; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler; @@@ -87,20 -87,20 +85,22 @@@ import org.apache.iotdb.cluster.utils.S import org.apache.iotdb.cluster.utils.nodetool.function.Status; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; ++import org.apache.iotdb.db.exception.ShutdownException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.PhysicalPlan; ++import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.IoTDB; ++import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -- import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TTransportException; @@@ -145,7 -145,7 +145,7 @@@ import static org.apache.iotdb.cluster. import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult; @SuppressWarnings("java:S1135") --public class MetaGroupMember extends RaftMember { ++public class MetaGroupMember extends RaftMember implements IService { /** the file that contains the identifier of this node */ static final String NODE_IDENTIFIER_FILE_NAME = @@@ -209,12 -209,12 +209,6 @@@ /** each node starts a data heartbeat server to transfer heartbeat requests */ private DataHeartbeatServer dataHeartbeatServer; -- /** -- * an override of TSServiceImpl, which redirect JDBC and Session requests to the MetaGroupMember -- * so they can be processed cluster-wide -- */ - private ClusterTSServiceImpl clusterTSServiceImpl; - private ClientServer clientServer; -- private DataClientProvider dataClientProvider; /** @@@ -276,7 -276,7 +270,6 @@@ Factory dataMemberFactory = new Factory(factory, this); dataClusterServer = new DataClusterServer(thisNode, dataMemberFactory, this); dataHeartbeatServer = new DataHeartbeatServer(thisNode, dataClusterServer); - clusterTSServiceImpl = new ClusterTSServiceImpl(this); - clientServer = new ClientServer(this); startUpStatus = getNewStartUpStatus(); // try loading the partition table if there was a previous cluster @@@ -333,8 -333,8 +326,8 @@@ } /** - * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClusterTSServiceImpl and reportThread. - * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClientServer and reportThread. -- * Calling the method twice does not induce side effects. ++ * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClusterTSServiceImpl and ++ * reportThread. Calling the method twice does not induce side effects. */ @Override public void stop() { @@@ -345,9 -345,9 +338,6 @@@ if (getDataHeartbeatServer() != null) { getDataHeartbeatServer().stop(); } - if (clusterTSServiceImpl != null) { - clusterTSServiceImpl.stop(); - if (clientServer != null) { - clientServer.stop(); -- } if (reportThread != null) { reportThread.shutdownNow(); try { @@@ -370,15 -370,15 +360,29 @@@ logger.info("{}: stopped", name); } ++ @Override ++ public void waitAndStop(long milliseconds) { ++ IService.super.waitAndStop(milliseconds); ++ } ++ ++ @Override ++ public void shutdown(long milliseconds) throws ShutdownException { ++ IService.super.shutdown(milliseconds); ++ } ++ ++ @Override ++ public ServiceType getID() { ++ return ServiceType.CLUSTER_META_ENGINE; ++ } ++ /** - * Start DataClusterServer and ClusterTSServiceImpl so this node will be able to respond to other nodes - * Start DataClusterServer and ClientServer so this node will be able to respond to other nodes -- * and clients. ++ * Start DataClusterServer and ClusterTSServiceImpl so this node will be able to respond to other ++ * nodes and clients. */ protected void initSubServers() throws TTransportException, StartupException { getDataClusterServer().start(); getDataHeartbeatServer().start(); - clusterTSServiceImpl.setCoordinator(this.coordinator); - clusterTSServiceImpl.start(); - clientServer.setCoordinator(this.coordinator); - clientServer.start(); ++ // TODO FIXME } /** @@@ -715,9 -714,9 +718,9 @@@ /** * Process a HeartBeatResponse from a follower. If the follower has provided its identifier, try * registering for it and if all nodes have registered and there is no available partition table, - * initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the follower requires - * initialize a new one and start the ClientServer and DataClusterServer. If the follower requires -- * a partition table, add it to the blind node list so that at the next heartbeat this node will -- * send it a partition table ++ * initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the follower ++ * requires a partition table, add it to the blind node list so that at the next heartbeat this ++ * node will send it a partition table */ @Override public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) { @@@ -800,8 -799,8 +803,8 @@@ } /** - * Start the DataClusterServer and ClusterTSServiceImpl` so this node can serve other nodes and clients. - * Start the DataClusterServer and ClientServer so this node can serve other nodes and clients. -- * Also build DataGroupMembers using the partition table. ++ * Start the DataClusterServer and ClusterTSServiceImpl` so this node can serve other nodes and ++ * clients. Also build DataGroupMembers using the partition table. */ protected synchronized void startSubServers() { logger.info("Starting sub-servers..."); @@@ -1452,7 -1451,7 +1455,8 @@@ private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, RaftNode header) throws IOException { RaftService.AsyncClient client = -- getClientProvider().getAsyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); ++ getClientProvider() ++ .getAsyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS()); return forwardPlanAsync(plan, receiver, header, client); } @@@ -1461,7 -1460,7 +1465,8 @@@ Client client; try { client = -- getClientProvider().getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); ++ getClientProvider() ++ .getSyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS()); } catch (TException e) { throw new IOException(e); } @@@ -1630,7 -1629,7 +1635,7 @@@ client.collectMigrationStatus(migrationStatusHandler); synchronized (resultRef) { if (resultRef.get() == null) { -- resultRef.wait(RaftServer.getConnectionTimeoutInMS()); ++ resultRef.wait(ClusterConstant.getConnectionTimeoutInMS()); } } return ClusterUtils.deserializeMigrationStatus(resultRef.get()); @@@ -1814,9 -1813,9 +1819,10 @@@ // ignore } super.stop(); - if (clusterTSServiceImpl != null) { - clusterTSServiceImpl.stop(); - if (clientServer != null) { - clientServer.stop(); -- } ++ // TODO FIXME ++ // if (clusterTSServiceImpl != null) { ++ // clusterTSServiceImpl.stop(); ++ // } logger.info("{} has been removed from the cluster", name); }) .start(); diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java index 9250785,ce941f5..560c0fe --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java @@@ -18,7 -18,7 +18,8 @@@ */ package org.apache.iotdb.cluster.utils.nodetool; -import org.apache.iotdb.cluster.ClusterMain; ++import org.apache.commons.collections4.map.MultiKeyMap; +import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.partition.PartitionGroup; @@@ -26,8 -26,8 +27,8 @@@ import org.apache.iotdb.cluster.partiti import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; --import org.apache.iotdb.cluster.server.MetaClusterServer; import org.apache.iotdb.cluster.server.NodeCharacter; ++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl; import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.server.monitor.Timer; @@@ -40,8 -40,8 +41,6 @@@ import org.apache.iotdb.db.service.ISer import org.apache.iotdb.db.service.JMXService; import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.tsfile.utils.Pair; -- --import org.apache.commons.collections4.map.MultiKeyMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -201,11 -201,11 +200,11 @@@ public class ClusterMonitor implements } private MetaGroupMember getMetaGroupMember() { - MetaClusterServer metaClusterServer = ClusterIoTDB.getInstance().getMetaServer(); - MetaClusterServer metaClusterServer = ClusterMain.getMetaServer(); -- if (metaClusterServer == null) { ++ RaftTSMetaServiceImpl RaftTSMetaServiceImpl = ClusterIoTDB.getInstance().getMetaServer(); ++ if (RaftTSMetaServiceImpl == null) { return null; } -- return metaClusterServer.getMember(); ++ return RaftTSMetaServiceImpl.getMember(); } private PartitionTable getPartitionTable() { diff --cc cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java index 7565e9d,70fbb66..c4f1dd7 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java @@@ -20,12 -20,12 +20,11 @@@ package org.apache.iotdb.cluster.integration; import org.apache.iotdb.cluster.config.ClusterDescriptor; --import org.apache.iotdb.cluster.server.MetaClusterServer; ++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl; import org.apache.iotdb.cluster.utils.Constants; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.session.Session; -- import org.junit.After; import org.junit.Before; @@@ -34,7 -34,7 +33,7 @@@ import java.util.List public abstract class BaseSingleNodeTest { -- private MetaClusterServer metaServer; ++ private RaftTSMetaServiceImpl metaServer; private boolean useAsyncServer; private List<String> seedNodeUrls; @@@ -44,14 -44,13 +43,14 @@@ @Before public void setUp() throws Exception { initConfigs(); -- metaServer = new MetaClusterServer(); ++ metaServer = new RaftTSMetaServiceImpl(); metaServer.start(); metaServer.buildCluster(); } @After public void tearDown() throws Exception { - //TODO fixme ++ // TODO fixme metaServer.stop(); recoverConfigs(); EnvironmentUtils.cleanEnv(); diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java index 499efce,5dde20f..60da68d8 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java @@@ -19,15 -19,15 +19,14 @@@ package org.apache.iotdb.cluster.server.clusterinfo; -import org.apache.iotdb.cluster.ClusterMain; +import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry; import org.apache.iotdb.cluster.rpc.thrift.Node; --import org.apache.iotdb.cluster.server.MetaClusterServer; ++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMemberTest; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.PartialPath; -- import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; @@@ -48,13 -48,13 +47,13 @@@ public class ClusterInfoServiceImplTes metaGroupMemberTest.setUp(); MetaGroupMember metaGroupMember = metaGroupMemberTest.getTestMetaGroupMember(); -- MetaClusterServer metaClusterServer = new MetaClusterServer(); -- metaClusterServer.getMember().stop(); -- metaClusterServer.setMetaGroupMember(metaGroupMember); ++ RaftTSMetaServiceImpl RaftTSMetaServiceImpl = new RaftTSMetaServiceImpl(); ++ RaftTSMetaServiceImpl.getMember().stop(); ++ RaftTSMetaServiceImpl.setMetaGroupMember(metaGroupMember); - ClusterIoTDB.setMetaClusterServer(metaClusterServer); - ClusterMain.setMetaClusterServer(metaClusterServer); ++ ClusterIoTDB.setMetaClusterServer(RaftTSMetaServiceImpl); -- metaClusterServer.getIoTDB().metaManager.setStorageGroup(new PartialPath("root", "sg")); ++ RaftTSMetaServiceImpl.getIoTDB().metaManager.setStorageGroup(new PartialPath("root", "sg")); // metaClusterServer.getMember() impl = new ClusterInfoServiceImpl(); } diff --cc server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java index 6f425ae,6f425ae..7c75e89 --- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java @@@ -27,6 -27,6 +27,7 @@@ import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; ++import java.util.concurrent.TimeUnit; /** This class is used to create thread pool which must contain the pool name. */ public class IoTDBThreadPoolFactory { @@@ -132,6 -132,6 +133,23 @@@ /** function for creating thrift rpc client thread pool. */ public static ExecutorService createThriftRpcClientThreadPool( ++ int minWorkerThreads, ++ int maxWorkerThreads, ++ int stopTimeoutVal, ++ TimeUnit stopTimeoutUnit, ++ String poolName) { ++ SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>(); ++ return new ThreadPoolExecutor( ++ minWorkerThreads, ++ maxWorkerThreads, ++ stopTimeoutVal, ++ stopTimeoutUnit, ++ executorQueue, ++ new IoTThreadFactory(poolName)); ++ } ++ ++ /** function for creating thrift rpc client thread pool. */ ++ public static ExecutorService createThriftRpcClientThreadPool( TThreadPoolServer.Args args, String poolName, Thread.UncaughtExceptionHandler handler) { SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>(); return new ThreadPoolExecutor( diff --cc server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java index bbbd6f1,0850b75..d37f57e --- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java +++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java @@@ -46,9 -46,7 +46,12 @@@ public enum ThreadName QUERY_SERVICE("Query"), WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"), CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"), - CLUSTER_INFO_SERVICE("ClusterInfoClient"); + CLUSTER_INFO_SERVICE("ClusterInfoClient"), + CLUSTER_RPC_SERVICE("ClusterRPC"), - CLUSTER_RPC_CLIENT("Cluster-RPC-Client"); ++ CLUSTER_RPC_CLIENT("Cluster-RPC-Client"), ++ CLUSTER_META_SERVICE("ClusterMetaService"), ++ CLUSTER_META_HEARTBEAT_SERVICE("ClusterMetaHeartbeatService"), ++ CLUSTER_DATA_SERVICE("ClusterDataService"); private final String name; diff --cc server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java index 6b61323,b2dd13a..83db4bc --- a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java @@@ -43,19 -43,13 +43,20 @@@ public class TracingManager private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; private BufferedWriter writer; private Map<Long, Long> queryStartTime = new ConcurrentHashMap<>(); + private Map<Long, TracingInfo> tracingInfoMap = new ConcurrentHashMap<>(); - public TracingManager(String dirName, String logFileName) { - initTracingManager(dirName, logFileName); + private TracingManager() { + initTracingManager(); } - public void initTracingManager(String dirName, String logFileName) { + public void initTracingManager() { + if (this.writer != null) { - //the tracing manager has been initialized. ++ // the tracing manager has been initialized. + return; + } + String dirName = IoTDBDescriptor.getInstance().getConfig().getTracingDir(); + String logFileName = IoTDBConstant.TRACING_LOG; + File tracingDir = SystemFileFactory.INSTANCE.getFile(dirName); if (!tracingDir.exists()) { if (tracingDir.mkdirs()) { diff --cc server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index 9cd89e6,1154e3d..f15d384 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@@ -152,14 -152,6 +152,14 @@@ public class IoTDB implements IoTDBMBea private void deactivate() { logger.info("Deactivating IoTDB..."); - //some user may call Tracing on but do not close tracing. - //so, when remove the system, we have to close the tracing ++ // some user may call Tracing on but do not close tracing. ++ // so, when remove the system, we have to close the tracing + if (IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) { + TracingManager.getInstance().close(); + } + PrimitiveArrayManager.close(); + SystemInfo.getInstance().close(); + registerManager.deregisterAll(); JMXService.deregisterMBean(mbeanName); logger.info("IoTDB is deactivated."); diff --cc server/src/main/java/org/apache/iotdb/db/service/RPCService.java index be5f2f7,5bbddec..6f9292c --- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java @@@ -47,6 -55,6 +47,7 @@@ public class RPCService extends ThriftS (TSServiceImpl) Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName()) .newInstance(); ++ initSyncedServiceImpl(null); processor = new Processor<>(impl); } diff --cc server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index febacac,2f61d90..90b0136 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@@ -55,9 -55,6 +55,12 @@@ public enum ServiceType SYSTEMINFO_SERVICE("MemTable Monitor Service", "MemTable, Monitor"), CONTINUOUS_QUERY_SERVICE("Continuous Query Service", "Continuous Query Service"), CLUSTER_INFO_SERVICE("Cluster Monitor Service (thrift-based)", "Cluster Monitor-Thrift"), + - CLUSTER_RPC_SERVICE("Cluster RPC ServerService", "ClusterRPCService"), - ++ CLUSTER_RPC_SERVICE("Cluster RPC Service", "ClusterRPCService"), ++ CLUSTER_META_RPC_SERVICE("Cluster Meta RPC Service", "ClusterMetaRPCService"), ++ CLUSTER_DATA_RPC_SERVICE("Cluster Data RPC Service", "ClusterDataRPCService"), ++ CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"), ++ CLUSTER_META_HEART_BEAT("Cluster Meta Heartbeat Service", "ClusterMetaHeartbeat"), ; private final String name; diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java index dfb2526,d975743..45fb0ec --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java @@@ -19,13 -19,13 +19,10 @@@ package org.apache.iotdb.db.service.thrift; --import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; --import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.JMXService; -- import org.apache.thrift.TProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -78,6 -83,6 +75,17 @@@ public abstract class ThriftService imp JMXService.deregisterMBean(mbeanName); } ++ boolean setSyncedImpl = false; ++ boolean setAsyncedImpl = false; ++ ++ public void initSyncedServiceImpl(Object serviceImpl) { ++ setSyncedImpl = true; ++ } ++ ++ public void initAsyncedServiceImpl(Object serviceImpl) { ++ setAsyncedImpl = true; ++ } ++ public abstract void initTProcessor() throws ClassNotFoundException, IllegalAccessException, InstantiationException; @@@ -101,6 -106,6 +109,10 @@@ try { reset(); initTProcessor(); ++ if (setSyncedImpl || setAsyncedImpl) { ++ throw new StartupException( ++ "At least one service implementataion of {} should be set.", this.getID().getName()); ++ } initThriftServiceThread(); thriftServiceThread.setThreadStopLatch(stopLatch); thriftServiceThread.start(); diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java index 2564d05,612d187..ed2237d --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java @@@ -24,14 -24,14 +24,18 @@@ import org.apache.iotdb.db.conf.IoTDBCo import org.apache.iotdb.db.exception.runtime.RPCServiceException; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.rpc.RpcTransportFactory; -- ++import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocolFactory; ++import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; ++import org.apache.thrift.server.TThreadedSelectorServer; ++import org.apache.thrift.transport.TNonblockingServerSocket; ++import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportException; @@@ -40,6 -40,6 +44,7 @@@ import org.slf4j.LoggerFactory import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.TimeUnit; public class ThriftServiceThread extends Thread { @@@ -51,8 -51,8 +56,95 @@@ private String serviceName; private TProtocolFactory protocolFactory; -- private TThreadPoolServer.Args poolArgs; ++ // currently, we can reuse the ProtocolFactory instance. ++ private static TCompactProtocol.Factory compactProtocolFactory = new TCompactProtocol.Factory(); ++ private static TBinaryProtocol.Factory binaryProtocolFactory = new TBinaryProtocol.Factory(); ++ ++ private void initProtocolFactory(boolean compress) { ++ protocolFactory = getProtocolFactory(compress); ++ } ++ ++ public static TProtocolFactory getProtocolFactory(boolean compress) { ++ if (compress) { ++ return compactProtocolFactory; ++ } else { ++ return binaryProtocolFactory; ++ } ++ } ++ ++ private void catchFailedInitialization(TTransportException e) throws RPCServiceException { ++ close(); ++ if (threadStopLatch == null) { ++ logger.debug("Stop Count Down latch is null"); ++ } else { ++ logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount()); ++ } ++ if (threadStopLatch != null && threadStopLatch.getCount() == 1) { ++ threadStopLatch.countDown(); ++ } ++ logger.debug( ++ "{}: close TThreadPoolServer and TServerSocket for {}", ++ IoTDBConstant.GLOBAL_DB_NAME, ++ serviceName); ++ throw new RPCServiceException( ++ String.format( ++ "%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME, serviceName), ++ e); ++ } ++ ++ /** for asynced ThriftService */ ++ @SuppressWarnings("squid:S107") ++ public ThriftServiceThread( ++ TBaseAsyncProcessor processor, ++ String serviceName, ++ String threadsName, ++ String bindAddress, ++ int port, ++ int maxWorkerThreads, ++ int timeoutSecond, ++ TServerEventHandler serverEventHandler, ++ boolean compress, ++ int connectionTimeoutInMS, ++ int maxReadBufferBytes, ++ ServerType serverType) { ++ initProtocolFactory(compress); ++ this.serviceName = serviceName; ++ try { ++ serverTransport = openNonblockingTransport(bindAddress, port, connectionTimeoutInMS); ++ switch (serverType) { ++ case SELECTOR: ++ TThreadedSelectorServer.Args poolArgs = ++ initAsyncedSelectorPoolArgs( ++ processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes); ++ poolServer = new TThreadedSelectorServer(poolArgs); ++ break; ++ case HSHA: ++ THsHaServer.Args poolArgs1 = ++ initAsyncedHshaPoolArgs( ++ processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes); ++ poolServer = new THsHaServer(poolArgs1); ++ break; ++ } ++ poolServer.setServerEventHandler(serverEventHandler); ++ } catch (TTransportException e) { ++ catchFailedInitialization(e); ++ } ++ } ++ ++ /** ++ * for synced ThriftServiceThread ++ * ++ * @param processor ++ * @param serviceName ++ * @param threadsName ++ * @param bindAddress ++ * @param port ++ * @param maxWorkerThreads ++ * @param timeoutSecond ++ * @param serverEventHandler ++ * @param compress ++ */ @SuppressWarnings("squid:S107") public ThriftServiceThread( TProcessor processor, @@@ -61,53 -61,53 +153,84 @@@ String bindAddress, int port, int maxWorkerThreads, - int timeoutMs, + int timeoutSecond, TServerEventHandler serverEventHandler, boolean compress) { -- if (compress) { -- protocolFactory = new TCompactProtocol.Factory(); -- } else { -- protocolFactory = new TBinaryProtocol.Factory(); -- } ++ initProtocolFactory(compress); this.serviceName = serviceName; try { serverTransport = openTransport(bindAddress, port); -- poolArgs = -- new TThreadPoolServer.Args(serverTransport) -- .maxWorkerThreads(maxWorkerThreads) -- .minWorkerThreads(CommonUtils.getCpuCores()) - .stopTimeoutVal(timeoutSecond); - .stopTimeoutVal(timeoutMs); -- poolArgs.executorService = -- IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName); -- poolArgs.processor(processor); -- poolArgs.protocolFactory(protocolFactory); -- poolArgs.transportFactory(RpcTransportFactory.INSTANCE); ++ TThreadPoolServer.Args poolArgs = ++ initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); poolServer = new TThreadPoolServer(poolArgs); poolServer.setServerEventHandler(serverEventHandler); } catch (TTransportException e) { -- close(); -- if (threadStopLatch == null) { -- logger.debug("Stop Count Down latch is null"); -- } else { -- logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount()); -- } -- if (threadStopLatch != null && threadStopLatch.getCount() == 1) { -- threadStopLatch.countDown(); -- } -- logger.debug( -- "{}: close TThreadPoolServer and TServerSocket for {}", -- IoTDBConstant.GLOBAL_DB_NAME, -- serviceName); -- throw new RPCServiceException( -- String.format( -- "%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME, serviceName), -- e); ++ catchFailedInitialization(e); } } ++ private TThreadPoolServer.Args initSyncedPoolArgs( ++ TProcessor processor, String threadsName, int maxWorkerThreads, int timeoutSecond) { ++ TThreadPoolServer.Args poolArgs = new TThreadPoolServer.Args(serverTransport); ++ poolArgs ++ .maxWorkerThreads(maxWorkerThreads) ++ .minWorkerThreads(CommonUtils.getCpuCores()) ++ .stopTimeoutVal(timeoutSecond); ++ poolArgs.executorService = ++ IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName); ++ poolArgs.processor(processor); ++ poolArgs.protocolFactory(protocolFactory); ++ poolArgs.transportFactory(RpcTransportFactory.INSTANCE); ++ return poolArgs; ++ } ++ ++ private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs( ++ TBaseAsyncProcessor processor, ++ String threadsName, ++ int maxWorkerThreads, ++ int timeoutSecond, ++ int maxReadBufferBytes) { ++ TThreadedSelectorServer.Args poolArgs = ++ new TThreadedSelectorServer.Args((TNonblockingServerTransport) serverTransport); ++ poolArgs.maxReadBufferBytes = maxReadBufferBytes; ++ poolArgs.selectorThreads(CommonUtils.getCpuCores()); ++ poolArgs.executorService( ++ IoTDBThreadPoolFactory.createThriftRpcClientThreadPool( ++ CommonUtils.getCpuCores(), ++ maxWorkerThreads, ++ timeoutSecond, ++ TimeUnit.SECONDS, ++ threadsName)); ++ poolArgs.processor(processor); ++ poolArgs.protocolFactory(protocolFactory); ++ poolArgs.transportFactory(RpcTransportFactory.INSTANCE); ++ return poolArgs; ++ } ++ ++ private THsHaServer.Args initAsyncedHshaPoolArgs( ++ TBaseAsyncProcessor processor, ++ String threadsName, ++ int maxWorkerThreads, ++ int timeoutSecond, ++ int maxReadBufferBytes) { ++ THsHaServer.Args poolArgs = new THsHaServer.Args((TNonblockingServerTransport) serverTransport); ++ poolArgs.maxReadBufferBytes = maxReadBufferBytes; ++ poolArgs.executorService( ++ IoTDBThreadPoolFactory.createThriftRpcClientThreadPool( ++ CommonUtils.getCpuCores(), ++ maxWorkerThreads, ++ timeoutSecond, ++ TimeUnit.SECONDS, ++ threadsName)); ++ poolArgs.processor(processor); ++ poolArgs.protocolFactory(protocolFactory); ++ poolArgs.transportFactory(RpcTransportFactory.INSTANCE); ++ return poolArgs; ++ } ++ @SuppressWarnings("java:S2259") -- public TServerTransport openTransport(String bindAddress, int port) throws TTransportException { ++ private TServerTransport openTransport(String bindAddress, int port) throws TTransportException { int maxRetry = 5; long retryIntervalMS = 5000; TTransportException lastExp = null; @@@ -127,6 -127,6 +250,28 @@@ throw lastExp; } ++ private TServerTransport openNonblockingTransport( ++ String bindAddress, int port, int connectionTimeoutInMS) throws TTransportException { ++ int maxRetry = 5; ++ long retryIntervalMS = 5000; ++ TTransportException lastExp = null; ++ for (int i = 0; i < maxRetry; i++) { ++ try { ++ return new TNonblockingServerSocket( ++ new InetSocketAddress(bindAddress, port), connectionTimeoutInMS); ++ } catch (TTransportException e) { ++ lastExp = e; ++ try { ++ Thread.sleep(retryIntervalMS); ++ } catch (InterruptedException interruptedException) { ++ Thread.currentThread().interrupt(); ++ break; ++ } ++ } ++ } ++ throw lastExp; ++ } ++ public void setThreadStopLatch(CountDownLatch threadStopLatch) { this.threadStopLatch = threadStopLatch; } @@@ -177,4 -177,4 +322,9 @@@ } return false; } ++ ++ public static enum ServerType { ++ SELECTOR, ++ HSHA ++ } } diff --cc server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java index 96ec8ef,96ec8ef..c7a1b9e --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java @@@ -29,7 -29,7 +29,6 @@@ import org.apache.iotdb.db.sync.receive import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer; import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl; import org.apache.iotdb.service.sync.thrift.SyncService; -- import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -61,6 -61,6 +60,7 @@@ public class SyncServerManager extends @Override public void initTProcessor() { ++ initSyncedServiceImpl(null); serviceImpl = new SyncServiceImpl(); processor = new SyncService.Processor<>(serviceImpl); } @@@ -93,6 -93,6 +93,11 @@@ } @Override ++ public int getRPCPort() { ++ return getBindPort(); ++ } ++ ++ @Override public void startService() throws StartupException { IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); if (!config.isSyncEnable()) {
