This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d32fc0713cd568c35cf7e43edcaf1eb8537a94f0 Merge: 850198002e c5192db130 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Apr 13 21:00:22 2022 +0800 Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel .github/workflows/influxdb-protocol.yml | 2 +- .github/workflows/main-unix.yml | 3 +- .github/workflows/main-win.yml | 3 +- .../apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4 | 2 +- .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 67 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 24 + .../org/apache/iotdb/cluster/ClusterIoTDB.java | 1 - .../apache/iotdb/cluster/query/ClusterPlanner.java | 1 + .../apache/iotdb/cluster/utils/StatusUtils.java | 6 +- confignode/pom.xml | 5 + .../resources/conf/iotdb-confignode.properties | 13 +- .../iotdb/confignode/conf/ConfigNodeConf.java | 14 + .../confignode/conf/ConfigNodeDescriptor.java | 4 + .../consensus/response/DataPartitionDataSet.java | 22 +- .../consensus/response/SchemaPartitionDataSet.java | 77 +- .../response/StorageGroupSchemaDataSet.java | 12 +- .../iotdb/confignode/manager/ConfigManager.java | 114 ++- .../iotdb/confignode/manager/ConsensusManager.java | 36 +- .../apache/iotdb/confignode/manager/Manager.java | 24 +- .../iotdb/confignode/manager/PartitionManager.java | 130 ++-- .../iotdb/confignode/manager/RegionManager.java | 6 +- .../confignode/partition/StorageGroupSchema.java | 9 + .../persistence/PartitionInfoPersistence.java | 60 +- .../persistence/RegionInfoPersistence.java | 29 +- .../iotdb/confignode/physical/PhysicalPlan.java | 2 +- .../physical/crud/CreateDataPartitionPlan.java | 5 +- .../physical/crud/CreateRegionsPlan.java | 5 +- .../physical/crud/CreateSchemaPartitionPlan.java | 70 +- .../crud/GetOrCreateDataPartitionPlan.java | 6 +- .../crud/GetOrCreateSchemaPartitionPlan.java | 86 ++- .../confignode/service/executor/PlanExecutor.java | 4 +- .../server/ConfigNodeRPCServerProcessor.java | 64 +- .../confignode/consensus/RatisConsensusDemo.java | 10 +- .../hash/DeviceGroupHashExecutorManualTest.java | 5 +- .../physical/SerializeDeserializeUT.java | 45 +- .../server/ConfigNodeRPCServerProcessorTest.java | 284 +++++--- .../iotdb/consensus/ratis/RequestMessage.java | 1 + distribution/pom.xml | 6 + docs/UserGuide/Maintenance-Tools/Sync-Tool.md | 396 +++++++--- docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md | 493 ++++++++++--- influxdb-protocol/pom.xml | 38 +- .../iotdb/influxdb/protocol/dto/SessionPoint.java | 6 +- .../protocol/impl/IoTDBInfluxDBService.java | 12 +- .../iotdb/influxdb/session/InfluxDBSession.java | 42 +- .../sync/IoTDBSyncReceiverCollectorIT.java | 513 +++++++++++++ .../db/integration/sync/IoTDBSyncReceiverIT.java | 357 +++++++++ .../sync/IoTDBSyncReceiverLoaderIT.java | 210 ++++++ .../db/integration/sync/IoTDBSyncSenderIT.java | 304 ++++++++ .../iotdb/db/integration/sync/SyncTestUtil.java | 203 ++++++ .../db/integration/sync/TransportClientMock.java | 69 ++ .../src/test/resources/iotdb-engine.properties | 3 +- integration/src/test/resources/logback-test.xml | 2 +- .../iotdb/commons/cluster/DataNodeLocation.java | 6 +- .../org/apache/iotdb/commons/cluster/Endpoint.java | 6 +- .../iotdb/commons/concurrent/ThreadName.java | 6 +- .../apache/iotdb/commons/conf/IoTDBConstant.java | 15 + .../iotdb/commons/consensus/ConsensusGroupId.java | 5 +- .../iotdb/commons/partition/DataPartition.java | 4 +- .../iotdb/commons/partition/RegionReplicaSet.java | 42 +- .../iotdb/commons/partition/SchemaPartition.java | 120 ++- .../apache/iotdb/commons/service/ServiceType.java | 2 + .../apache/iotdb/commons/utils/CommonUtils.java | 4 +- .../apache/iotdb/commons}/utils/StatusUtils.java | 49 +- pom.xml | 1 + server/pom.xml | 5 + .../resources/conf/iotdb-engine.properties | 19 +- .../resources/conf/iotdb-sync-client.properties | 35 - .../assembly/resources/tools/start-sync-client.bat | 71 -- .../assembly/resources/tools/start-sync-client.sh | 54 -- .../assembly/resources/tools/stop-sync-client.bat | 23 - .../assembly/resources/tools/stop-sync-client.sh | 30 - .../apache/iotdb/db/client/ConfigNodeClient.java | 309 ++++++++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 81 ++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 43 +- .../consensus/statemachine/BaseStateMachine.java | 2 + .../statemachine/DataRegionStateMachine.java | 37 +- .../statemachine/SchemaRegionStateMachine.java | 17 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 16 +- .../apache/iotdb/db/engine/StorageEngineV2.java | 62 +- .../iotdb/db/engine/modification/Deletion.java | 23 + .../iotdb/db/engine/storagegroup/DataRegion.java | 183 +++-- .../db/engine/storagegroup/TsFileManager.java | 38 + .../db/engine/storagegroup/TsFileProcessor.java | 23 + .../dataregion/StorageGroupManager.java | 10 + .../sync/PipeDataLoadBearableException.java} | 10 +- .../sync/PipeDataLoadException.java} | 10 +- .../sync/PipeDataLoadUnbearableException.java} | 10 +- .../sync/PipeException.java} | 20 +- .../sync/PipeServerException.java} | 22 +- .../PipeSinkException.java} | 24 +- .../iotdb/db/metadata/Executor/SchemaVisitor.java | 96 +++ ...ocalConfigManager.java => LocalConfigNode.java} | 170 ++--- .../db/metadata/LocalSchemaPartitionTable.java | 4 +- .../iotdb/db/metadata/LocalSchemaProcessor.java | 28 +- .../iotdb/db/metadata/mtree/MTreeBelowSG.java | 1 + .../traverser/collector/MeasurementCollector.java | 4 + .../db/metadata/schemaregion/SchemaEngine.java | 27 +- .../db/metadata/schemaregion/SchemaRegion.java | 20 + .../storagegroup/IStorageGroupSchemaManager.java | 8 + .../storagegroup/StorageGroupSchemaManager.java | 28 + .../iotdb/db/mpp/buffer/DataBlockManager.java | 7 +- .../iotdb/db/mpp/buffer/IDataBlockManager.java | 5 + .../apache/iotdb/db/mpp/common/PlanFragmentId.java | 10 +- .../org/apache/iotdb/db/mpp/common/QueryId.java | 8 +- .../apache/iotdb/db/mpp/execution/DataDriver.java | 3 +- .../db/mpp/execution/FragmentInstanceManager.java | 4 +- .../iotdb/db/mpp/execution/QueryExecution.java | 22 +- .../db/mpp/execution/SchemaDriverContext.java | 8 +- .../scheduler/SimpleFragInstanceDispatcher.java | 4 +- .../operator/schema/DevicesSchemaScanOperator.java | 97 +++ .../mpp/operator/schema/SchemaMergeOperator.java | 79 ++ .../db/mpp/operator/schema/SchemaScanOperator.java | 107 +++ .../schema/TimeSeriesSchemaScanOperator.java | 142 ++++ .../db/mpp/operator/source/SeriesScanOperator.java | 2 +- .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 42 +- .../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 7 +- .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java | 7 +- .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java | 7 +- .../mpp/sql/analyze/StandaloneSchemaFetcher.java | 7 +- .../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java | 5 + .../db/mpp/sql/planner/DistributionPlanner.java | 133 +++- .../db/mpp/sql/planner/LocalExecutionPlanner.java | 65 +- .../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 47 ++ .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 42 ++ .../db/mpp/sql/planner/plan/FragmentInstance.java | 66 +- .../db/mpp/sql/planner/plan/PlanFragment.java | 10 +- .../plan/SimpleFragmentParallelPlanner.java | 6 +- .../planner/plan/WriteFragmentParallelPlanner.java | 17 +- .../db/mpp/sql/planner/plan/node/PlanNode.java | 1 + .../db/mpp/sql/planner/plan/node/PlanNodeType.java | 15 +- .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 3 + .../db/mpp/sql/planner/plan/node/PlanVisitor.java | 30 + .../node/metedata/read/DevicesSchemaScanNode.java | 108 +++ .../read/SchemaMergeNode.java} | 44 +- .../plan/node/metedata/read/SchemaScanNode.java | 119 +++ .../metedata/read/TimeSeriesSchemaScanNode.java | 145 ++++ .../node/metedata/write/AlterTimeSeriesNode.java | 37 + .../write/CreateAlignedTimeSeriesNode.java | 13 + .../node/metedata/write/CreateTimeSeriesNode.java | 127 +++- .../planner/plan/node/process/ExchangeNode.java | 1 + .../planner/plan/node/sink/FragmentSinkNode.java | 1 + .../plan/node/source/SeriesAggregateScanNode.java | 10 +- .../planner/plan/node/source/SeriesScanNode.java | 15 +- .../sql/planner/plan/node/source/SourceNode.java | 4 +- .../plan/node/write/InsertMultiTabletsNode.java | 7 +- .../sql/planner/plan/node/write/InsertNode.java | 1 + .../planner/plan/node/write/InsertRowsNode.java | 5 + .../plan/node/write/InsertRowsOfOneDeviceNode.java | 5 + .../db/mpp/sql/statement/StatementVisitor.java | 10 + .../crud/InsertMultiTabletsStatement.java | 8 + .../crud/InsertRowsOfOneDeviceStatement.java | 1 + .../sql/statement/crud/InsertRowsStatement.java | 8 + .../db/mpp/sql/statement/crud/InsertStatement.java | 1 + .../db/mpp/sql/statement/crud/QueryStatement.java | 1 + .../metadata/AlterTimeSeriesStatement.java | 1 + .../metadata/CreateAlignedTimeSeriesStatement.java | 1 + .../metadata/CreateTimeSeriesStatement.java | 1 + .../statement/metadata/ShowDevicesStatement.java | 6 + .../mpp/sql/statement/metadata/ShowStatement.java | 10 + .../metadata/ShowTimeSeriesStatement.java | 6 + .../db/protocol/influxdb/handler/QueryHandler.java | 6 +- .../apache/iotdb/db/qp/constant/SQLConstant.java | 26 + .../apache/iotdb/db/qp/executor/PlanExecutor.java | 189 ++++- .../org/apache/iotdb/db/qp/logical/Operator.java | 14 +- .../db/qp/logical/sys/CreatePipeOperator.java | 69 ++ .../db/qp/logical/sys/CreatePipeSinkOperator.java | 61 ++ .../iotdb/db/qp/logical/sys/DropPipeOperator.java | 24 +- .../db/qp/logical/sys/DropPipeSinkOperator.java | 23 +- .../iotdb/db/qp/logical/sys/ShowPipeOperator.java | 27 +- .../db/qp/logical/sys/ShowPipeServerOperator.java | 20 +- .../db/qp/logical/sys/ShowPipeSinkOperator.java | 27 +- .../qp/logical/sys/ShowPipeSinkTypeOperator.java | 20 +- .../iotdb/db/qp/logical/sys/StartPipeOperator.java | 24 +- .../db/qp/logical/sys/StartPipeServerOperator.java | 18 +- .../iotdb/db/qp/logical/sys/StopPipeOperator.java | 24 +- .../db/qp/logical/sys/StopPipeServerOperator.java | 18 +- .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 10 + .../qp/physical/crud/InsertMultiTabletsPlan.java | 2 +- .../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 2 +- .../physical/sys/CreateAlignedTimeSeriesPlan.java | 15 +- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 2 +- .../iotdb/db/qp/physical/sys/CreatePipePlan.java | 110 +++ .../db/qp/physical/sys/CreatePipeSinkPlan.java | 98 +++ .../db/qp/physical/sys/CreateTimeSeriesPlan.java | 10 + .../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 2 +- .../iotdb/db/qp/physical/sys/DropPipeSinkPlan.java | 29 +- .../iotdb/db/qp/physical/sys/OperatePipePlan.java | 29 +- .../physical/sys/ShowPipePlan.java} | 22 +- .../physical/sys/ShowPipeServerPlan.java} | 11 +- .../physical/sys/ShowPipeSinkPlan.java} | 23 +- .../physical/sys/ShowPipeSinkTypePlan.java} | 11 +- .../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 6 +- .../db/qp/physical/sys/StartPipeServerPlan.java | 56 ++ .../db/qp/physical/sys/StopPipeServerPlan.java | 56 ++ .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 190 ++++- .../apache/iotdb/db/qp/utils/DatetimeUtils.java | 15 + .../java/org/apache/iotdb/db/service/DataNode.java | 67 +- .../iotdb/db/service/InternalServiceImpl.java | 32 +- .../java/org/apache/iotdb/db/service/IoTDB.java | 11 +- .../thrift/impl/DataNodeManagementServiceImpl.java | 68 +- .../service/thrift/impl/InfluxDBServiceImpl.java | 42 +- .../apache/iotdb/db/sync/conf/SyncConstant.java | 85 +-- .../apache/iotdb/db/sync/conf/SyncPathUtil.java | 100 +++ .../iotdb/db/sync/conf/SyncSenderConfig.java | 127 ---- .../iotdb/db/sync/conf/SyncSenderDescriptor.java | 124 ---- .../org/apache/iotdb/db/sync/package-info.java | 38 - .../iotdb/db/sync/pipedata/DeletionPipeData.java | 85 +++ .../apache/iotdb/db/sync/pipedata/PipeData.java | 96 +++ .../iotdb/db/sync/pipedata/SchemaPipeData.java | 98 +++ .../iotdb/db/sync/pipedata/TsFilePipeData.java | 177 +++++ .../sync/pipedata/queue/BufferedPipeDataQueue.java | 438 +++++++++++ .../queue/PipeDataQueue.java} | 21 +- .../sync/pipedata/queue/PipeDataQueueFactory.java | 57 ++ .../iotdb/db/sync/receiver/ReceiverService.java | 278 +++++++ .../db/sync/receiver/collector/Collector.java | 170 +++++ .../db/sync/receiver/load/DeletionLoader.java | 48 ++ .../iotdb/db/sync/receiver/load/FileLoader.java | 203 ------ .../db/sync/receiver/load/FileLoaderManager.java | 213 ------ .../iotdb/db/sync/receiver/load/IFileLoader.java | 50 -- .../iotdb/db/sync/receiver/load/ILoadLogger.java | 57 -- .../receiver/load/{LoadType.java => ILoader.java} | 12 +- .../iotdb/db/sync/receiver/load/LoadLogger.java | 72 -- .../iotdb/db/sync/receiver/load/SchemaLoader.java | 60 ++ .../iotdb/db/sync/receiver/load/TsFileLoader.java | 67 ++ .../iotdb/db/sync/receiver/manager/PipeInfo.java | 85 +++ .../db/sync/receiver/manager/PipeMessage.java | 76 ++ .../db/sync/receiver/manager/ReceiverManager.java | 216 ++++++ .../sync/receiver/recover/ISyncReceiverLogger.java | 50 -- .../receiver/recover/SyncReceiverLogAnalyzer.java | 154 ---- .../sync/receiver/recover/SyncReceiverLogger.java | 72 -- .../db/sync/receiver/recovery/ReceiverLog.java | 127 ++++ .../receiver/recovery/ReceiverLogAnalyzer.java | 157 ++++ .../db/sync/receiver/transfer/SyncServiceImpl.java | 370 ---------- .../db/sync/sender/manage/ISyncFileManager.java | 72 -- .../db/sync/sender/manage/SyncFileManager.java | 291 -------- .../db/sync/sender/manager/SchemaSyncManager.java | 163 +++++ .../db/sync/sender/manager/TsFileSyncManager.java | 118 +++ .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java | 97 +++ .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 106 +++ .../sender/pipe/PipeSink.java} | 41 +- .../iotdb/db/sync/sender/pipe/TsFilePipe.java | 334 +++++++++ .../sender/recover/ISyncSenderLogAnalyzer.java | 47 -- .../db/sync/sender/recover/ISyncSenderLogger.java | 67 -- .../sync/sender/recover/SyncSenderLogAnalyzer.java | 128 ---- .../db/sync/sender/recover/SyncSenderLogger.java | 72 -- .../db/sync/sender/recovery/SenderLogAnalyzer.java | 172 +++++ .../db/sync/sender/recovery/SenderLogger.java | 141 ++++ .../db/sync/sender/recovery/TsFilePipeLogger.java | 150 ++++ .../db/sync/sender/service/SenderService.java | 417 +++++++++++ .../db/sync/sender/service/TransportHandler.java | 127 ++++ .../iotdb/db/sync/sender/transfer/ISyncClient.java | 95 --- .../iotdb/db/sync/sender/transfer/SyncClient.java | 810 --------------------- .../client/ITransportClient.java} | 19 +- .../db/sync/transport/client/TransportClient.java | 527 ++++++++++++++ .../conf/TransportConfig.java} | 26 +- .../transport/conf/TransportConstant.java} | 24 +- .../server/TransportServerManager.java} | 80 +- .../server/TransportServerManagerMBean.java} | 6 +- .../server/TransportServerThriftHandler.java} | 30 +- .../transport/server/TransportServiceImpl.java | 385 ++++++++++ .../org/apache/iotdb/db/utils/DataTypeUtils.java | 8 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 8 + .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 +- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 181 ++++- .../org/apache/iotdb/db/utils/StatusUtils.java | 46 -- .../java/org/apache/iotdb/db/utils/SyncUtils.java | 109 --- .../iotdb/db/wal/recover/WALNodeRecoverTask.java | 4 +- .../db/mpp/operator/SchemaScanOperatorTest.java | 242 ++++++ .../db/mpp/sql/plan/DistributionPlannerTest.java | 108 ++- .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 16 +- .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 85 +++ .../sql/plan/node/PlanNodeDeserializeHelper.java | 3 +- .../metadata/read/ShowDevicesNodeSerdeTest.java | 3 +- .../sql/plan/node/process/OffsetNodeSerdeTest.java | 6 +- .../plan/node/sink/FragmentSinkNodeSerdeTest.java | 3 +- .../source/SeriesAggregateScanNodeSerdeTest.java | 2 +- .../iotdb/db/qp/physical/PhysicalPlanTest.java | 28 + .../iotdb/db/service/InternalServiceImplTest.java | 167 +++++ .../sync/pipedata/BufferedPipeDataQueueTest.java | 542 ++++++++++++++ .../iotdb/db/sync/pipedata/PipeDataTest.java | 86 +++ .../db/sync/receiver/load/FileLoaderTest.java | 405 ----------- .../sync/receiver/manager/ReceiverManagerTest.java | 98 +++ .../recover/SyncReceiverLogAnalyzerTest.java | 229 ------ .../receiver/recover/SyncReceiverLoggerTest.java | 115 --- .../receiver/recovery/ReceiverLogAnalyzerTest.java | 124 ++++ .../db/sync/sender/manage/SyncFileManagerTest.java | 350 --------- .../sender/recover/SyncSenderLogAnalyzerTest.java | 201 ----- .../sync/sender/recover/SyncSenderLoggerTest.java | 112 --- .../db/sync/sender/transfer/SyncClientTest.java | 161 ---- .../db/sync/transport/TransportServiceTest.java | 205 ++++++ server/src/test/resources/iotdb-engine.properties | 4 +- server/src/test/resources/logback-test.xml | 2 +- .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 15 +- .../iotdb/rpc/StatementExecutionException.java | 4 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 4 +- .../src/main/thrift/confignode.thrift | 67 +- thrift-influxdb/src/main/thrift/influxdb.thrift | 36 +- thrift-sync/src/main/thrift/sync.thrift | 51 -- thrift-sync/src/main/thrift/transport.thrift | 90 +++ thrift/src/main/thrift/common.thrift | 43 +- 300 files changed, 14532 insertions(+), 6902 deletions(-) diff --cc node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java index 115c4406c4,d98695824d..fa3e61ef10 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java @@@ -46,16 -45,19 +45,18 @@@ public class RegionReplicaSet this.dataNodeList = dataNodeList; } - public ConsensusGroupId getId() { - return id; + public ConsensusGroupId getConsensusGroupId() { + return consensusGroupId; } - public void setId(ConsensusGroupId id) { - this.id = id; + public void setConsensusGroupId(ConsensusGroupId consensusGroupId) { + this.consensusGroupId = consensusGroupId; } + @Override public String toString() { - return String.format("RegionReplicaSet[%s-%s]: %s", id.getType(), id, dataNodeList); + return String.format( - "RegionReplicaSet[%s-%d]: %s", - consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList); ++ "RegionReplicaSet[%s-%s]: %s", consensusGroupId.getType(), consensusGroupId, dataNodeList); } public void serializeImpl(ByteBuffer buffer) { diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java index f87979a2a2,4ef647dc7e..7836d3460f --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java @@@ -22,9 -22,19 +22,22 @@@ import org.apache.iotdb.commons.partiti import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.analyze.QueryType; - import org.apache.iotdb.db.mpp.sql.planner.plan.*; - import org.apache.iotdb.db.mpp.sql.planner.plan.node.*; + import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan; + import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance; + import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner; + import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan; + import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment; + import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner; + import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan; ++import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner; ++import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode; @@@ -83,9 -92,8 +98,10 @@@ public class DistributionPlanner // Convert fragment to detailed instance // And for parallel-able fragment, clone it into several instances with different params. public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) { - IFragmentParallelPlaner parallelPlaner = context.getQueryType() == QueryType.READ ? - new SimpleFragmentParallelPlanner(subPlan, analysis, context) : - new WriteFragmentParallelPlanner(subPlan, analysis, context); + IFragmentParallelPlaner parallelPlaner = - new SimpleFragmentParallelPlanner(subPlan, analysis, context); ++ context.getQueryType() == QueryType.READ ++ ? new SimpleFragmentParallelPlanner(subPlan, analysis, context) ++ : new WriteFragmentParallelPlanner(subPlan, analysis, context); return parallelPlaner.parallelPlan(); } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java index c263732b00,3e55a5d32c..72177f06c8 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java @@@ -30,9 -31,8 +30,7 @@@ import org.apache.iotdb.tsfile.read.fil import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - import java.io.IOException; import java.nio.ByteBuffer; --import java.util.Objects; public class FragmentInstance implements IConsensusRequest { private final FragmentInstanceId id; @@@ -48,21 -50,23 +48,31 @@@ // We can add some more params for a specific FragmentInstance // So that we can make different FragmentInstance owns different data range. - public FragmentInstance(PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) { - public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) { ++ public FragmentInstance( ++ PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) { this.fragment = fragment; this.timeFilter = timeFilter; - this.id = generateId(fragment.getId(), index); + this.id = id; this.type = type; } - public static FragmentInstanceId generateId(PlanFragmentId id, int index) { - return new FragmentInstanceId(id, String.valueOf(index)); + public RegionReplicaSet getDataRegionId() { - return dataRegion; ++ return regionReplicaSet; ++ } ++ ++ public void setDataRegionAndHost(RegionReplicaSet regionReplicaSet) { ++ this.regionReplicaSet = regionReplicaSet; ++ // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current ++ // instance ++ this.hostEndpoint = regionReplicaSet.getDataNodeList().get(0).getEndPoint(); } - public void setDataRegionAndHost(RegionReplicaSet dataRegion) { - this.dataRegion = dataRegion; - // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance - this.hostEndpoint = dataRegion.getDataNodeList().get(0).getEndPoint(); + public RegionReplicaSet getRegionReplicaSet() { + return regionReplicaSet; + } + + public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) { + this.regionReplicaSet = regionReplicaSet; } public Endpoint getHostEndpoint() { @@@ -120,17 -133,10 +135,9 @@@ Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null; QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)]; FragmentInstance fragmentInstance = - new FragmentInstance( - planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType); + new FragmentInstance(planFragment, id, timeFilter, queryType); - RegionReplicaSet regionReplicaSet = new RegionReplicaSet(); - try { - regionReplicaSet.deserializeImpl(buffer); - } catch (IOException e) { - e.printStackTrace(); - } - Endpoint endpoint = new Endpoint(); - endpoint.deserializeImpl(buffer); - fragmentInstance.dataRegion = regionReplicaSet; - fragmentInstance.hostEndpoint = endpoint; + fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer); + fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer); return fragmentInstance; } @@@ -144,20 -150,25 +151,7 @@@ timeFilter.serialize(buffer); } ReadWriteIOUtils.write(type.ordinal(), buffer); - dataRegion.serializeImpl(buffer); + regionReplicaSet.serializeImpl(buffer); hostEndpoint.serializeImpl(buffer); } -- -- @Override -- public boolean equals(Object o) { -- if (this == o) return true; -- if (o == null || getClass() != o.getClass()) return false; -- FragmentInstance instance = (FragmentInstance) o; - return Objects.equals(id, instance.id) && type == instance.type && Objects.equals(fragment, instance.fragment) && Objects.equals(dataRegion, instance.dataRegion) && Objects.equals(hostEndpoint, instance.hostEndpoint) && Objects.equals(timeFilter, instance.timeFilter); - return Objects.equals(id, instance.id) - && type == instance.type - && Objects.equals(fragment, instance.fragment) - && Objects.equals(regionReplicaSet, instance.regionReplicaSet) - && Objects.equals(hostEndpoint, instance.hostEndpoint) - && Objects.equals(timeFilter, instance.timeFilter); -- } -- -- @Override -- public int hashCode() { - return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter); - return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter); -- } } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java index 280d9891af,d3357ec275..631b217695 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java @@@ -99,7 -100,11 +99,7 @@@ public class SimpleFragmentParallelPlan // We need to store all the replica host in case of the scenario that the instance need to be // redirected // to another host when scheduling - fragmentInstance.setDataRegionAndHost(dataRegion); - fragmentInstance.setRegionReplicaSet(regionReplicaSet); - - // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current - // instance - fragmentInstance.setHostEndpoint(regionReplicaSet.getDataNodeList().get(0).getEndPoint()); ++ fragmentInstance.setDataRegionAndHost(regionReplicaSet); instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); fragmentInstanceList.add(fragmentInstance); } diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java index eebf705cd3,0000000000..d8c9435214 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java @@@ -1,61 -1,0 +1,70 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.sql.planner.plan; + +import org.apache.iotdb.db.mpp.common.MPPQueryContext; +import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import java.util.ArrayList; +import java.util.List; + - public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner{ ++public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner { + + private SubPlan subPlan; + private Analysis analysis; + private MPPQueryContext queryContext; + - public WriteFragmentParallelPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) { ++ public WriteFragmentParallelPlanner( ++ SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) { + this.subPlan = subPlan; + this.analysis = analysis; + this.queryContext = queryContext; + } + + @Override + public List<FragmentInstance> parallelPlan() { + PlanFragment fragment = subPlan.getPlanFragment(); - Filter timeFilter = analysis.getQueryFilter() != null ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() : null; ++ Filter timeFilter = ++ analysis.getQueryFilter() != null ++ ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() ++ : null; + PlanNode node = fragment.getRoot(); + if (!(node instanceof IWritePlanNode)) { + throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation"); + } + List<IWritePlanNode> splits = ((IWritePlanNode) node).splitByPartition(analysis); + List<FragmentInstance> ret = new ArrayList<>(); + for (IWritePlanNode split : splits) { - FragmentInstance instance = new FragmentInstance(new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType()); ++ FragmentInstance instance = ++ new FragmentInstance( ++ new PlanFragment(fragment.getId(), split), ++ fragment.getId().genFragmentInstanceId(), ++ timeFilter, ++ queryContext.getQueryType()); + instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet()); + ret.add(instance); + } + return ret; + } +} diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java index 49b1065a05,47fbed6dff..3fbbf98f0f --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java @@@ -20,8 -20,8 +20,9 @@@ package org.apache.iotdb.db.mpp.sql.pla import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.partition.RegionReplicaSet; + import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; @@@ -112,9 -112,9 +113,9 @@@ public class InsertMultiTabletsNode ext Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>(); for (int i = 0; i < insertTabletNodeList.size(); i++) { InsertTabletNode insertTabletNode = insertTabletNodeList.get(i); - List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis); - for (InsertNode subNode : tmpResult) { - RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet(); + List<IWritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis); + for (IWritePlanNode subNode : tmpResult) { - RegionReplicaSet dataRegionReplicaSet = ((InsertNode)subNode).getDataRegionReplicaSet(); ++ RegionReplicaSet dataRegionReplicaSet = ((InsertNode) subNode).getDataRegionReplicaSet(); if (splitMap.containsKey(dataRegionReplicaSet)) { InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet); tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i); diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java index d7eef6b50b,30b091d83b..2aead26a69 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java @@@ -26,9 -27,11 +26,10 @@@ import org.apache.iotdb.db.mpp.sql.plan import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + import java.nio.ByteBuffer; -import java.util.List; -public abstract class InsertNode extends PlanNode { +public abstract class InsertNode extends IWritePlanNode { /** * if use id table, this filed is id form of device path <br> diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java index 3cba509506,b3012c9986..4a8583d999 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java @@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.partition.RegionReplicaSet; + import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java index 8dc7fd7671,3c22c0fd57..3b5079e625 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java @@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.partition.RegionReplicaSet; + import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.mpp.sql.analyze.Analysis; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.exception.NotImplementedException; diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java index ed192c357b,51a2313f08..39d9c6a71d --- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java @@@ -41,13 -46,10 +46,12 @@@ import org.apache.iotdb.db.mpp.sql.plan import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode; - import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; ++import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import com.google.common.collect.Sets; - import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import java.util.ArrayList; @@@ -205,35 -254,6 +256,32 @@@ public class DistributionPlannerTest assertEquals(3, plan.getInstances().size()); } + @Test + public void TestWriteParallelPlan() throws IllegalPathException { + QueryId queryId = new QueryId("test_write"); - InsertRowNode insertRowNode = new InsertRowNode( - queryId.genPlanNodeId(), - new PartialPath("root.sg.d1"), - false, - new MeasurementSchema[]{ - new MeasurementSchema("s1", TSDataType.INT32), - }, - new TSDataType[]{ - TSDataType.INT32 - }, - 1L, - new Object[]{ - 10 - }); ++ InsertRowNode insertRowNode = ++ new InsertRowNode( ++ queryId.genPlanNodeId(), ++ new PartialPath("root.sg.d1"), ++ false, ++ new MeasurementSchema[] { ++ new MeasurementSchema("s1", TSDataType.INT32), ++ }, ++ new TSDataType[] {TSDataType.INT32}, ++ 1L, ++ new Object[] {10}); + + Analysis analysis = constructAnalysis(); + + MPPQueryContext context = + new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint()); + DistributionPlanner planner = + new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode)); + DistributedQueryPlan plan = planner.planFragments(); + plan.getInstances().forEach(System.out::println); + assertEquals(1, plan.getInstances().size()); + } + private Analysis constructAnalysis() { Analysis analysis = new Analysis(); diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java index f57f60dda6,6871ff6560..4bf6a477e2 --- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java @@@ -18,8 -18,6 +18,7 @@@ */ package org.apache.iotdb.db.mpp.sql.plan; - import com.google.common.collect.ImmutableList; +import org.apache.iotdb.commons.cluster.DataNodeLocation; import org.apache.iotdb.commons.cluster.Endpoint; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.partition.RegionReplicaSet; @@@ -45,6 -43,6 +44,7 @@@ import org.apache.iotdb.tsfile.read.exp import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.operator.Gt; ++import com.google.common.collect.ImmutableList; import org.junit.Test; import java.nio.ByteBuffer; @@@ -64,8 -61,9 +64,10 @@@ public class FragmentInstanceSerdeTest new GroupByFilter(1, 2, 3, 4), QueryType.READ); RegionReplicaSet regionReplicaSet = - new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666)))); - new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()); - fragmentInstance.setRegionReplicaSet(regionReplicaSet); - fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666)); ++ new RegionReplicaSet( ++ new DataRegionId(1), ++ ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666)))); + fragmentInstance.setDataRegionAndHost(regionReplicaSet); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); fragmentInstance.serializeRequest(byteBuffer); @@@ -84,8 -81,9 +86,10 @@@ null, QueryType.READ); RegionReplicaSet regionReplicaSet = - new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667)))); - new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()); - fragmentInstance.setRegionReplicaSet(regionReplicaSet); - fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667)); ++ new RegionReplicaSet( ++ new DataRegionId(1), ++ ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667)))); + fragmentInstance.setDataRegionAndHost(regionReplicaSet); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); fragmentInstance.serializeRequest(byteBuffer); diff --cc server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java index 0000000000,7ed08d65b6..d5ba4d1f07 mode 000000,100644..100644 --- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java +++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java @@@ -1,0 -1,164 +1,167 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.iotdb.db.service; + + import org.apache.iotdb.commons.cluster.DataNodeLocation; + import org.apache.iotdb.commons.cluster.Endpoint; + import org.apache.iotdb.commons.consensus.SchemaRegionId; + import org.apache.iotdb.commons.partition.RegionReplicaSet; + import org.apache.iotdb.consensus.common.Peer; + import org.apache.iotdb.db.conf.IoTDBConfig; + import org.apache.iotdb.db.conf.IoTDBDescriptor; + import org.apache.iotdb.db.consensus.ConsensusImpl; + import org.apache.iotdb.db.exception.metadata.MetadataException; + import org.apache.iotdb.db.metadata.LocalConfigNode; + import org.apache.iotdb.db.metadata.path.PartialPath; + import org.apache.iotdb.db.mpp.common.PlanFragmentId; + import org.apache.iotdb.db.mpp.sql.analyze.QueryType; + import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance; + import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; + import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode; + import org.apache.iotdb.db.utils.EnvironmentUtils; + import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId; + import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; + import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; + import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp; + import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + import org.apache.iotdb.tsfile.read.filter.GroupByFilter; + + import org.apache.ratis.util.FileUtils; + import org.junit.After; + import org.junit.Assert; + import org.junit.Before; + import org.junit.Test; + + import java.io.File; + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + + public class InternalServiceImplTest { + private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); + InternalServiceImpl internalServiceImpl; + LocalConfigNode configNode; + + @Before + public void setUp() throws Exception { + IoTDB.configManager.init(); + configNode = LocalConfigNode.getInstance(); + configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln")); + ConsensusImpl.getInstance().start(); + RegionReplicaSet regionReplicaSet = genRegionReplicaSet(); + ConsensusImpl.getInstance() + .addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet)); + internalServiceImpl = new InternalServiceImpl(); + } + + @After + public void tearDown() throws Exception { + IoTDB.configManager.clear(); + RegionReplicaSet regionReplicaSet = genRegionReplicaSet(); + ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId()); + ConsensusImpl.getInstance().stop(); + EnvironmentUtils.cleanEnv(); + FileUtils.deleteFully(new File("data" + File.separator + "consensus")); + } + + @Test + public void createTimeseriesTest() throws MetadataException { + configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln")); + CreateTimeSeriesNode createTimeSeriesNode = + new CreateTimeSeriesNode( + new PlanNodeId("0"), + new PartialPath("root.ln.wf01.wt01.status"), + TSDataType.BOOLEAN, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + new HashMap<String, String>() { + { + put("MAX_POINT_NUMBER", "3"); + } + }, + new HashMap<String, String>() { + { + put("tag1", "v1"); + put("tag2", "v2"); + } + }, + new HashMap<String, String>() { + { + put("attr1", "a1"); + put("attr2", "a2"); + } + }, + "meter1"); + + RegionReplicaSet regionReplicaSet = genRegionReplicaSet(); + PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode); + FragmentInstance fragmentInstance = - new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE); - fragmentInstance.setRegionReplicaSet(regionReplicaSet); - fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666)); ++ new FragmentInstance( ++ planFragment, ++ planFragment.getId().genFragmentInstanceId(), ++ new GroupByFilter(1, 2, 3, 4), ++ QueryType.WRITE); ++ fragmentInstance.setDataRegionAndHost(regionReplicaSet); + + // serialize fragmentInstance + ByteBuffer byteBuffer = ByteBuffer.allocate(1024); + fragmentInstance.serializeRequest(byteBuffer); + byteBuffer.flip(); + + // put serialized fragmentInstance to TSendFragmentInstanceReq + TSendFragmentInstanceReq request = new TSendFragmentInstanceReq(); + TFragmentInstance tFragmentInstance = new TFragmentInstance(); + tFragmentInstance.setBody(byteBuffer); + request.setFragmentInstance(tFragmentInstance); + request.setConsensusGroupId( + new TConsensusGroupId( + regionReplicaSet.getConsensusGroupId().getId(), + regionReplicaSet.getConsensusGroupId().getType().toString())); + request.setQueryType(QueryType.WRITE.toString()); + + // Use consensus layer to execute request + TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request); + + Assert.assertTrue(response.accepted); + } + + private RegionReplicaSet genRegionReplicaSet() { + List<DataNodeLocation> dataNodeList = new ArrayList<>(); + dataNodeList.add( + new DataNodeLocation( + conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort()))); + + // construct fragmentInstance + SchemaRegionId schemaRegionId = new SchemaRegionId(0); + return new RegionReplicaSet(schemaRegionId, dataNodeList); + } + + private List<Peer> genPeerList(RegionReplicaSet regionReplicaSet) { + List<Peer> peerList = new ArrayList<>(); + for (DataNodeLocation node : regionReplicaSet.getDataNodeList()) { + peerList.add(new Peer(regionReplicaSet.getConsensusGroupId(), node.getEndPoint())); + } + return peerList; + } + }
