This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f342899dec2f1d056c027e7d7ef9dc62d07906e5 Merge: 327eb7e e09b377 Author: lta <[email protected]> AuthorDate: Thu Jan 14 18:58:33 2021 +0800 merge master docker/src/main/Dockerfile-0.11.0 => .dockerignore | 24 +- .github/pull_request_template.md | 37 +- .github/workflows/e2e.yml | 52 ++ NOTICE | 2 +- NOTICE-binary | 2 +- README.md | 15 + README_ZH.md | 14 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 17 +- .../java/org/apache/iotdb/cli/AbstractCli.java | 138 ++- cli/src/main/java/org/apache/iotdb/cli/Cli.java | 2 + cli/src/main/java/org/apache/iotdb/cli/WinCli.java | 3 + .../org/apache/iotdb/cli/utils/IoTPrinter.java | 107 +++ .../main/java/org/apache/iotdb/tool/ImportCsv.java | 63 +- cluster/pom.xml | 7 + cluster/src/assembly/resources/sbin/start-node.sh | 2 +- .../java/org/apache/iotdb/cluster/ClusterMain.java | 20 +- .../apache/iotdb/cluster/RemoteTsFileResource.java | 41 +- .../cluster/client/async/AsyncClientPool.java | 79 +- .../iotdb/cluster/client/sync/SyncClientPool.java | 24 +- .../apache/iotdb/cluster/config/ClusterConfig.java | 14 +- .../iotdb/cluster/config/ClusterConstant.java | 33 +- .../iotdb/cluster/coordinator/Coordinator.java | 602 +++++++++++++ .../apache/iotdb/cluster/log/LogDispatcher.java | 16 +- .../cluster/log/applier/AsyncDataLogApplier.java | 4 +- .../iotdb/cluster/log/applier/MetaLogApplier.java | 8 +- .../iotdb/cluster/log/catchup/CatchUpTask.java | 2 +- .../cluster/log/manage/CommittedEntryManager.java | 5 +- .../log/manage/MetaSingleSnapshotLogManager.java | 1 + .../iotdb/cluster/log/manage/RaftLogManager.java | 26 +- .../log/manage/UnCommittedEntryManager.java | 10 +- .../apache/iotdb/cluster/metadata/CMManager.java | 10 +- .../cluster/query/manage/QueryCoordinator.java | 115 +-- .../apache/iotdb/cluster/server/ClientServer.java | 28 +- .../iotdb/cluster/server/DataClusterServer.java | 2 +- .../iotdb/cluster/server/MetaClusterServer.java | 17 +- .../apache/iotdb/cluster/server/RaftServer.java | 7 +- .../handlers/caller/AppendNodeEntryHandler.java | 6 +- .../server/handlers/caller/HeartbeatHandler.java | 2 +- .../cluster/server/heartbeat/HeartbeatThread.java | 8 +- .../cluster/server/member/DataGroupMember.java | 17 +- .../cluster/server/member/MetaGroupMember.java | 198 +++-- .../iotdb/cluster/server/member/RaftMember.java | 130 ++- .../cluster/server/{ => monitor}/NodeReport.java | 3 +- .../manage => server/monitor}/NodeStatus.java | 41 +- .../monitor/NodeStatusManager.java} | 87 +- .../iotdb/cluster/server/{ => monitor}/Peer.java | 2 +- .../iotdb/cluster/server/{ => monitor}/Timer.java | 15 +- .../cluster/server/service/MetaAsyncService.java | 6 + .../cluster/server/service/MetaSyncService.java | 5 + .../apache/iotdb/cluster/utils/ClusterUtils.java | 9 +- .../cluster/utils/nodetool/ClusterMonitor.java | 2 +- .../iotdb/cluster/common/EnvironmentUtils.java | 218 ----- .../org/apache/iotdb/cluster/common/IoTDBTest.java | 4 +- .../cluster/integration/BaseSingleNodeTest.java | 2 +- .../iotdb/cluster/integration/SingleNodeTest.java | 7 +- .../iotdb/cluster/log/CommitLogCallbackTest.java | 2 +- .../iotdb/cluster/log/CommitLogTaskTest.java | 2 +- .../iotdb/cluster/log/LogDispatcherTest.java | 2 +- .../log/applier/AsyncDataLogApplierTest.java | 2 +- .../cluster/log/applier/DataLogApplierTest.java | 13 +- .../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 9 +- .../cluster/log/catchup/LogCatchUpTaskTest.java | 8 +- .../log/catchup/SnapshotCatchUpTaskTest.java | 8 +- .../manage/MetaSingleSnapshotLogManagerTest.java | 3 + .../cluster/log/manage/RaftLogManagerTest.java | 22 +- .../cluster/log/snapshot/DataSnapshotTest.java | 12 +- .../log/snapshot/MetaSimpleSnapshotTest.java | 2 + .../cluster/log/snapshot/PullSnapshotTaskTest.java | 5 + .../iotdb/cluster/partition/SlotManagerTest.java | 2 +- .../cluster/partition/SlotPartitionTableTest.java | 2 +- .../apache/iotdb/cluster/query/BaseQueryTest.java | 15 +- .../query/ClusterAggregateExecutorTest.java | 48 +- .../query/ClusterDataQueryExecutorTest.java | 22 +- .../cluster/query/ClusterFillExecutorTest.java | 72 +- .../cluster/query/ClusterPlanExecutorTest.java | 8 +- .../cluster/query/ClusterQueryRouterTest.java | 216 ++--- .../ClusterGroupByNoVFilterDataSetTest.java | 64 +- .../groupby/ClusterGroupByVFilterDataSetTest.java | 74 +- .../query/groupby/MergeGroupByExecutorTest.java | 83 +- .../query/groupby/RemoteGroupByExecutorTest.java | 146 ++-- .../cluster/query/manage/QueryCoordinatorTest.java | 15 +- .../query/reader/ClusterTimeGeneratorTest.java | 34 +- .../cluster/query/reader/DatasourceInfoTest.java | 16 +- .../reader/RemoteSeriesReaderByTimestampTest.java | 122 +-- .../query/reader/RemoteSimpleSeriesReaderTest.java | 136 +-- .../caller/AppendGroupEntryHandlerTest.java | 2 +- .../caller/AppendNodeEntryHandlerTest.java | 4 +- .../handlers/caller/ElectionHandlerTest.java | 2 +- .../handlers/caller/HeartbeatHandlerTest.java | 2 +- .../handlers/caller/LogCatchUpHandlerTest.java | 2 +- .../server/heartbeat/DataHeartbeatThreadTest.java | 5 + .../server/heartbeat/HeartbeatThreadTest.java | 10 +- .../server/heartbeat/MetaHeartbeatThreadTest.java | 5 + .../cluster/server/member/DataGroupMemberTest.java | 129 +-- .../iotdb/cluster/server/member/MemberTest.java | 35 +- .../cluster/server/member/MetaGroupMemberTest.java | 83 +- docker/src/main/Dockerfile | 46 +- docker/src/main/Dockerfile-0.10.0 | 4 +- docker/src/main/Dockerfile-0.10.1 | 4 +- docker/src/main/Dockerfile-0.11.0 | 4 +- .../main/{Dockerfile-0.11.0 => Dockerfile-0.11.1} | 10 +- .../main/{Dockerfile-0.11.0 => Dockerfile-0.11.2} | 10 +- docs/UserGuide/Client/Programming - Native API.md | 18 + docs/UserGuide/Client/Status Codes.md | 2 + .../Ecosystem Integration/Zeppelin-IoTDB.md | 20 +- docs/UserGuide/Operation Manual/Administration.md | 2 + .../DML Data Manipulation Language.md | 155 +++- .../Operation Manual/UDF User Defined Function.md | 192 ++++- docs/UserGuide/Server/Config Manual.md | 18 + docs/UserGuide/System Tools/CSV Tool.md | 1 + docs/zh/UserGuide/Client/Command Line Interface.md | 2 +- .../UserGuide/Client/Programming - Native API.md | 14 + docs/zh/UserGuide/Client/Status Codes.md | 2 + .../Ecosystem Integration/Zeppelin-IoTDB.md | 20 +- .../UserGuide/Operation Manual/Administration.md | 2 + .../DML Data Manipulation Language.md | 156 +++- .../Operation Manual/UDF User Defined Function.md | 195 ++++- docs/zh/UserGuide/Server/Config Manual.md | 17 + docs/zh/UserGuide/System Tools/CSV Tool.md | 1 + .../main/java/org/apache/iotdb/SessionExample.java | 20 +- grafana/img/add_data_source.png | Bin 175851 -> 108927 bytes grafana/img/add_graph.png | Bin 723579 -> 364163 bytes grafana/img/edit_data_source.png | Bin 313673 -> 177869 bytes .../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java | 4 +- .../main/java/org/apache/iotdb/jdbc/Config.java | 18 +- .../org/apache/iotdb/jdbc/IoTDBConnection.java | 15 +- .../apache/iotdb/jdbc/IoTDBConnectionParams.java | 20 +- .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 4 +- .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java | 6 +- .../apache/iotdb/jdbc/IoTDBPreparedStatement.java | 8 +- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 29 +- .../src/main/java/org/apache/iotdb/jdbc/Utils.java | 7 + pom.xml | 2 +- .../file-changelists/TsFileResource-changelist.md | 8 +- .../resources/conf/iotdb-engine.properties | 28 + server/src/assembly/resources/conf/logback.xml | 25 +- server/src/assembly/resources/sbin/start-server.sh | 2 +- .../org/apache/iotdb/db/auth/AuthorityChecker.java | 5 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 43 +- .../org/apache/iotdb/db/conf/IoTDBConstant.java | 14 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 16 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 37 +- .../db/engine/cache/TimeSeriesMetadataCache.java | 32 +- .../compaction/CompactionMergeTaskPoolManager.java | 32 +- .../db/engine/compaction/TsFileManagement.java | 38 +- .../level/LevelCompactionTsFileManagement.java | 258 +++--- .../no/NoCompactionTsFileManagement.java | 10 +- .../engine/compaction/utils/CompactionLogger.java | 2 +- .../engine/compaction/utils/CompactionUtils.java | 12 +- .../apache/iotdb/db/engine/flush/FlushManager.java | 24 +- .../iotdb/db/engine/flush/MemTableFlushTask.java | 48 +- .../merge/selector/MaxFileMergeFileSelector.java | 11 +- .../iotdb/db/engine/merge/task/MergeFileTask.java | 138 ++- .../db/engine/merge/task/MergeMultiChunkTask.java | 4 +- .../io/LocalTextModificationAccessor.java | 61 +- .../engine/storagegroup/StorageGroupProcessor.java | 220 ++--- .../db/engine/storagegroup/TsFileProcessor.java | 89 +- .../db/engine/storagegroup/TsFileResource.java | 356 +++----- .../storagegroup/timeindex/DeviceTimeIndex.java | 308 +++++++ .../storagegroup/timeindex/FileTimeIndex.java | 193 +++++ .../engine/storagegroup/timeindex/ITimeIndex.java | 138 +++ .../storagegroup/timeindex/TimeIndexLevel.java} | 43 +- .../apache/iotdb/db/exception/IoTDBException.java | 21 + .../db/exception/PartitionViolationException.java | 8 +- ...xception.java => QueryIdNotExsitException.java} | 10 +- .../iotdb/db/exception/StorageEngineException.java | 2 +- .../db/exception/UDFRegistrationException.java | 7 +- .../iotdb/db/exception/WriteProcessException.java | 4 + .../metadata/AliasAlreadyExistException.java | 1 + .../exception/metadata/IllegalPathException.java | 1 + .../db/exception/metadata/MetadataException.java | 8 + .../metadata/PathAlreadyExistException.java | 1 + .../exception/metadata/PathNotExistException.java | 20 +- .../metadata/StorageGroupNotSetException.java | 5 + .../db/exception/query/OutOfTTLException.java | 2 +- .../db/exception/query/QueryProcessException.java | 6 +- .../QueryTimeoutRuntimeException.java} | 68 +- .../org/apache/iotdb/db/metadata/MManager.java | 61 +- .../java/org/apache/iotdb/db/metadata/MTree.java | 77 +- .../iotdb/db/metadata/logfile/MLogWriter.java | 143 +--- .../apache/iotdb/db/metrics/ui/MetricsPage.java | 2 +- .../apache/iotdb/db/monitor/MonitorConstants.java | 5 - .../org/apache/iotdb/db/monitor/StatMonitor.java | 6 +- .../org/apache/iotdb/db/mqtt/PublishHandler.java | 123 +-- .../main/java/org/apache/iotdb/db/qp/Planner.java | 1 + .../apache/iotdb/db/qp/constant/SQLConstant.java | 12 + .../apache/iotdb/db/qp/executor/IPlanExecutor.java | 8 + .../apache/iotdb/db/qp/executor/PlanExecutor.java | 192 ++++- .../org/apache/iotdb/db/qp/logical/Operator.java | 5 +- ...TracingOperator.java => KillQueryOperator.java} | 20 +- .../db/qp/logical/sys/RemoveFileOperator.java | 5 - .../db/qp/logical/sys/ShowDevicesOperator.java | 18 + .../iotdb/db/qp/logical/sys/TracingOperator.java | 10 +- .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 8 +- .../iotdb/db/qp/physical/crud/InsertRowPlan.java | 25 + .../physical/crud/InsertRowsOfOneDevicePlan.java | 154 ++++ .../apache/iotdb/db/qp/physical/crud/UDFPlan.java | 3 +- .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 3 +- .../db/qp/physical/sys/AlterTimeSeriesPlan.java | 2 +- .../iotdb/db/qp/physical/sys/AuthorPlan.java | 2 +- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 6 +- .../db/qp/physical/sys/CreateTimeSeriesPlan.java | 14 +- .../{ShowDevicesPlan.java => KillQueryPlan.java} | 25 +- .../iotdb/db/qp/physical/sys/ShowDevicesPlan.java | 13 +- .../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 51 +- .../physical/sys/ShowQueryProcesslistPlan.java} | 56 +- .../db/qp/physical/sys/ShowTimeSeriesPlan.java | 56 +- .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 34 +- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 31 +- .../db/qp/{constant => utils}/DatetimeUtils.java | 12 +- .../db/query/aggregation/AggregateResult.java | 3 +- .../db/query/aggregation/impl/AvgAggrResult.java | 10 +- .../db/query/aggregation/impl/SumAggrResult.java | 10 +- .../iotdb/db/query/control/QueryFileManager.java | 2 + .../db/query/control/QueryResourceManager.java | 3 + .../iotdb/db/query/control/QueryTimeManager.java | 172 ++++ .../iotdb/db/query/control/TracingManager.java | 2 +- .../apache/iotdb/db/query/dataset/ListDataSet.java | 12 +- .../db/query/dataset/NonAlignEngineDataSet.java | 22 + .../dataset/RawQueryDataSetWithoutValueFilter.java | 39 +- .../apache/iotdb/db/query/dataset/ShowDataSet.java | 78 ++ .../iotdb/db/query/dataset/ShowDevicesDataSet.java | 58 ++ .../db/query/dataset/ShowTimeseriesDataSet.java | 46 +- .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 2 +- .../dataset/groupby/GroupByEngineDataSet.java | 8 +- .../groupby/GroupByWithValueFilterDataSet.java | 3 +- .../db/query/executor/AggregationExecutor.java | 4 +- .../db/query/executor/RawDataQueryExecutor.java | 4 +- .../iotdb/db/query/reader/series/SeriesReader.java | 14 + .../org/apache/iotdb/db/query/udf/api/UDF.java | 15 + .../org/apache/iotdb/db/query/udf/api/UDTF.java | 22 +- .../db/query/udf/api/collector/PointCollector.java | 4 +- .../api/customizer/config/UDTFConfigurations.java | 3 +- .../parameter/UDFParameterValidator.java | 209 +++++ .../api/customizer/parameter/UDFParameters.java | 32 + .../strategy/SlidingTimeWindowAccessStrategy.java | 2 +- .../UDFAttributeNotProvidedException.java | 9 +- .../udf/api/exception/UDFException.java} | 62 +- .../UDFInputSeriesDataTypeNotValidException.java} | 34 +- .../UDFInputSeriesIndexNotValidException.java} | 60 +- .../UDFInputSeriesNumberNotValidException.java} | 17 +- .../UDFParameterNotValidException.java} | 56 +- .../db/query/udf/builtin/BuiltinFunction.java | 76 ++ .../iotdb/db/query/udf/builtin/UDTFAbs.java} | 148 ++-- .../udf/{api/UDF.java => builtin/UDTFAcos.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFAsin.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFAtan.java} | 57 +- .../iotdb/db/query/udf/builtin/UDTFBottomK.java | 105 +++ .../udf/{api/UDF.java => builtin/UDTFCeil.java} | 57 +- .../db/query/udf/builtin/UDTFCommonDerivative.java | 62 ++ .../udf/builtin/UDTFCommonValueDifference.java | 60 ++ .../iotdb/db/query/udf/builtin/UDTFContains.java} | 110 +-- .../udf/{api/UDF.java => builtin/UDTFCos.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFDegrees.java} | 57 +- .../db/query/udf/builtin/UDTFDerivative.java} | 111 +-- .../udf/{api/UDF.java => builtin/UDTFExp.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFFloor.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFLog.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFLog10.java} | 57 +- .../iotdb/db/query/udf/builtin/UDTFMatches.java} | 111 +-- .../iotdb/db/query/udf/builtin/UDTFMath.java | 89 ++ .../udf/builtin/UDTFNonNegativeDerivative.java | 63 ++ .../builtin/UDTFNonNegativeValueDifference.java | 61 ++ .../udf/{api/UDF.java => builtin/UDTFRadians.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFRound.java} | 57 +- .../iotdb/db/query/udf/builtin/UDTFSelectK.java | 156 ++++ .../udf/{api/UDF.java => builtin/UDTFSign.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFSin.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFSqrt.java} | 57 +- .../udf/{api/UDF.java => builtin/UDTFTan.java} | 57 +- .../db/query/udf/builtin/UDTFTimeDifference.java} | 110 +-- .../iotdb/db/query/udf/builtin/UDTFTopK.java | 103 +++ .../db/query/udf/builtin/UDTFValueDifference.java} | 107 ++- .../iotdb/db/query/udf/builtin/UDTFValueTrend.java | 73 ++ .../db/query/udf/core/executor/UDTFExecutor.java | 35 +- .../udf/core/transformer/UDFQueryTransformer.java | 18 +- .../query/udf/service/UDFClassLoaderManager.java | 9 +- .../udf/service/UDFRegistrationInformation.java | 25 +- .../query/udf/service/UDFRegistrationService.java | 145 +++- .../apache/iotdb/db/rescon/MemTableManager.java | 116 +++ .../org/apache/iotdb/db/rescon/SystemInfo.java | 1 + .../java/org/apache/iotdb/db/service/IoTDB.java | 9 +- .../apache/iotdb/db/service/RegisterManager.java | 17 +- .../org/apache/iotdb/db/service/ServiceType.java | 1 + .../org/apache/iotdb/db/service/TSServiceImpl.java | 948 +++++++++------------ .../db/sync/receiver/load/FileLoaderManager.java | 6 +- .../iotdb/db/sync/sender/transfer/SyncClient.java | 41 +- .../apache/iotdb/db/tools/IoTDBDataDirViewer.java | 4 +- .../iotdb/db/tools/TsFileResourcePrinter.java | 11 +- .../db/tools/watermark/WatermarkDetector.java | 2 +- .../org/apache/iotdb/db/utils/FilePathUtils.java | 4 + .../org/apache/iotdb/db/utils/SchemaUtils.java | 9 +- .../apache/iotdb/db/utils/TypeInferenceUtils.java | 3 + .../main/resources/iotdb/ui/static/iotdb-logo.png | Bin 1768 -> 1187 bytes .../compaction/LevelCompactionMergeTest.java | 4 +- .../compaction/LevelCompactionRecoverTest.java | 10 +- .../LevelCompactionTsFileManagementTest.java | 1 - .../NoCompactionTsFileManagementTest.java | 1 - .../engine/storagegroup/TsFileProcessorTest.java | 14 +- .../iotdb/db/integration/IOTDBGroupByIT.java | 1 - .../iotdb/db/integration/IoTDBClearCacheIT.java | 2 +- .../iotdb/db/integration/IoTDBDeletionIT.java | 35 + .../apache/iotdb/db/integration/IoTDBFillIT.java | 6 + .../db/integration/IoTDBFlushQueryMergeIT.java | 2 +- .../integration/IoTDBGroupByFillWithRangeIT.java | 3 - .../iotdb/db/integration/IoTDBInsertNaNIT.java | 46 +- .../iotdb/db/integration/IoTDBKillQueryTest.java | 84 ++ .../apache/iotdb/db/integration/IoTDBLastIT.java | 1 - .../db/integration/IoTDBLevelCompactionIT.java | 1 - .../db/integration/IoTDBLoadExternalTsfileIT.java | 20 +- .../iotdb/db/integration/IoTDBMergeTest.java | 5 + .../iotdb/db/integration/IoTDBMultiSeriesIT.java | 26 +- .../db/integration/IoTDBQueryTimeoutTest.java | 154 ++++ .../iotdb/db/integration/IoTDBRestartIT.java | 40 +- .../db/integration/IoTDBRpcCompressionIT.java | 1 - .../iotdb/db/integration/IoTDBSensorUpdateIT.java | 3 - .../iotdb/db/integration/IoTDBSimpleQueryIT.java | 52 ++ .../iotdb/db/integration/IoTDBUDFManagementIT.java | 163 +++- .../integration/IoTDBUDTFAlignByTimeQueryIT.java | 213 +++++ .../db/integration/IoTDBUDTFBuiltinFunctionIT.java | 250 ++++++ .../db/integration/IoTDBUDTFHybridQueryIT.java | 6 +- .../aggregation/IoTDBAggregationIT.java | 24 +- .../aggregation/IoTDBAggregationSmallDataIT.java | 4 +- .../org/apache/iotdb/db/metadata/MTreeTest.java | 23 + .../iotdb/db/monitor/IoTDBStatMonitorTest.java | 153 ++++ .../java/org/apache/iotdb/db/qp/PlannerTest.java | 59 +- .../qp/{plan => logical}/IndexLogicalPlanTest.java | 2 +- .../qp/{plan => logical}/LogicalPlanSmallTest.java | 3 +- .../qp/{plan => physical}/ConcatOptimizerTest.java | 3 +- .../IndexSubMatchingPhysicalPlanTest.java | 3 +- .../IndexWholeMatchingPhysicalPlanTest.java | 3 +- .../db/qp/physical/PhysicalPlanSerializeTest.java | 305 +++++++ .../db/qp/{plan => physical}/PhysicalPlanTest.java | 4 +- .../qp/{plan => physical}/SerializationTest.java | 3 +- .../db/qp/sql/DatetimeQueryDataSetUtilsTest.java | 142 --- .../IoTDBsqlVisitorTest.java} | 4 +- .../db/qp/utils/DatetimeQueryDataSetUtilsTest.java | 190 +++++ .../iotdb/db/query/control/TracingManagerTest.java | 15 +- .../iotdb/db/query/dataset/ListDataSetTest.java | 2 +- .../db/query/reader/series/SeriesReaderTest.java | 22 +- .../iotdb/db/query/udf/example/Accumulator.java | 16 +- .../apache/iotdb/db/query/udf/example/Adder.java | 19 +- .../apache/iotdb/db/query/udf/example/Counter.java | 8 +- ...gSizeWindowConstructorTester1.java => Max.java} | 130 +-- .../iotdb/db/query/udf/example/Multiplier.java | 16 +- .../SlidingSizeWindowConstructorTester0.java | 9 +- .../SlidingSizeWindowConstructorTester1.java | 17 +- .../SlidingTimeWindowConstructionTester.java | 17 +- ...onstructorTester1.java => TerminateTester.java} | 122 +-- .../{Multiplier.java => ValidateTester.java} | 100 +-- .../db/sync/receiver/load/FileLoaderTest.java | 8 +- .../recover/SyncReceiverLogAnalyzerTest.java | 4 +- .../org/apache/iotdb/db/tools/MLogParserTest.java | 35 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 13 +- .../db/utils/datastructure/PrecisionTest.java | 22 +- .../db/writelog/recover/DeviceStringTest.java | 8 +- server/src/test/resources/iotdb-engine.properties | 2 + server/src/test/resources/logback.xml | 1 + .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 9 +- .../org/apache/iotdb/rpc/RpcTransportFactory.java | 14 +- .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 8 +- .../rpc/TCompressedElasticFramedTransport.java | 45 +- .../apache/iotdb/rpc/TElasticFramedTransport.java | 41 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../iotdb/rpc/TSnappyElasticFramedTransport.java | 2 +- .../rpc/TimeoutChangeableTFastFramedTransport.java | 11 +- .../main/java/org/apache/iotdb/session/Config.java | 13 +- .../java/org/apache/iotdb/session/Session.java | 176 +++- .../apache/iotdb/session/SessionConnection.java | 34 +- .../org/apache/iotdb/session/SessionDataSet.java | 12 +- .../org/apache/iotdb/session/pool/SessionPool.java | 81 ++ .../iotdb/session/IoTDBSessionComplexIT.java | 3 +- .../iotdb/session/IoTDBSessionIteratorIT.java | 29 + .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 130 +++ .../public/img/contributor-avatar/jlq.png | Bin 122303 -> 98500 bytes .../public/img/contributor-avatar/kfx.jpeg | Bin 134257 -> 119029 bytes .../public/img/contributor-avatar/xdh.jpg | Bin 136069 -> 123718 bytes .../.vuepress/public/img/contributor-avatar/yt.jpg | Bin 131028 -> 115804 bytes .../public/img/contributor-avatar/zss.jpg | Bin 98085 -> 91455 bytes site/src/main/.vuepress/public/img/home-Slide1.png | Bin 438294 -> 323108 bytes site/src/main/.vuepress/public/img/home-Slide2.png | Bin 440893 -> 323620 bytes site/src/main/.vuepress/public/img/home-Slide3.png | Bin 441335 -> 324332 bytes site/src/main/.vuepress/public/img/home-icon2.png | Bin 5529 -> 4463 bytes site/src/main/.vuepress/public/img/home-icon3.png | Bin 20637 -> 10753 bytes site/src/main/.vuepress/public/img/home-icon4.png | Bin 13225 -> 8681 bytes site/src/main/.vuepress/public/img/home-icon5.png | Bin 1989 -> 1531 bytes site/src/main/.vuepress/public/img/home-icon6.png | Bin 16502 -> 11537 bytes site/src/main/.vuepress/public/img/logo.png | Bin 31747 -> 21687 bytes site/src/main/.vuepress/public/img/tools.jpg | Bin 347602 -> 294103 bytes spark-iotdb-connector/Readme.md | 32 +- spark-tsfile/README.md | 31 +- .../e2e/base/docker-compose.yaml | 54 +- test/e2e/cases/README.md | 53 ++ .../e2e/cases/cli/README.md | 20 +- .../e2e/cases/cli/cleanup.sh | 30 +- .../e2e/cases/cli/docker-compose.yaml | 44 +- test/e2e/cases/cli/res/init.sql | 26 + .../Dockerfile-0.11.0 => test/e2e/cases/cli/run.sh | 51 +- thrift/src/main/thrift/cluster.thrift | 7 + thrift/src/main/thrift/rpc.thrift | 15 + .../tsfile/encoding/bitpacking/IntPacker.java | 4 +- .../tsfile/encoding/bitpacking/LongPacker.java | 4 +- .../exception/QueryTimeoutRuntimeException.java | 68 +- .../write/UnSupportedDataTypeException.java | 4 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 195 +++-- .../iotdb/tsfile/read/reader/LocalTsFileInput.java | 20 + .../org/apache/iotdb/tsfile/utils/BytesUtils.java | 4 +- .../apache/iotdb/tsfile/write/TsFileWriter.java | 13 +- .../org/apache/iotdb/tsfile/read/ReadTest.java | 22 +- .../iotdb/tsfile/read/TimePlainEncodeReadTest.java | 5 - .../tsfile/read/TimeSeriesMetadataReadTest.java | 87 ++ .../read/query/executor/QueryExecutorTest.java | 1 - .../apache/iotdb/tsfile/utils/FileGenerator.java | 22 +- .../tsfile/write/DefaultDeviceTemplateTest.java | 110 +++ zeppelin-interpreter/pom.xml | 2 +- .../apache/zeppelin/iotdb/IoTDBInterpreter.java | 212 +++-- .../zeppelin/iotdb/IoTDBInterpreterTest.java | 108 ++- 417 files changed, 12524 insertions(+), 5187 deletions(-) diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java index bcde426,39994bb..5266f4f --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java @@@ -299,10 -310,11 +312,11 @@@ public class ClusterMain public int calculateSlotByPartitionNum(String storageGroupName, long partitionId, int maxSlotNum) { int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k; - if (sgSerialNum > 0) { - return maxSlotNum / k * sgSerialNum; + if (sgSerialNum >= 0) { + return (int)(maxSlotNum / k * (sgSerialNum + 0.5)); } else { - return defaultStrategy.calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum); + return defaultStrategy + .calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum); } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java index 0000000,4e15c6f..dbfb9ff mode 000000,100644..100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java @@@ -1,0 -1,598 +1,602 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.iotdb.cluster.coordinator; + + import org.apache.iotdb.cluster.client.async.AsyncDataClient; + import org.apache.iotdb.cluster.client.sync.SyncDataClient; + import org.apache.iotdb.cluster.config.ClusterDescriptor; + import org.apache.iotdb.cluster.exception.CheckConsistencyException; ++import org.apache.iotdb.cluster.exception.UnknownLogTypeException; + import org.apache.iotdb.cluster.exception.UnsupportedPlanException; + import org.apache.iotdb.cluster.metadata.CMManager; + import org.apache.iotdb.cluster.partition.PartitionGroup; + import org.apache.iotdb.cluster.query.ClusterPlanRouter; + import org.apache.iotdb.cluster.rpc.thrift.Node; ++import org.apache.iotdb.cluster.rpc.thrift.RaftNode; + import org.apache.iotdb.cluster.rpc.thrift.RaftService; + import org.apache.iotdb.cluster.server.RaftServer; + import org.apache.iotdb.cluster.server.monitor.Timer; + import org.apache.iotdb.cluster.server.member.MetaGroupMember; + import org.apache.iotdb.cluster.utils.PartitionUtils; + import org.apache.iotdb.cluster.utils.StatusUtils; + import org.apache.iotdb.db.conf.IoTDBConstant; + import org.apache.iotdb.db.exception.metadata.IllegalPathException; + import org.apache.iotdb.db.exception.metadata.MetadataException; + import org.apache.iotdb.db.exception.metadata.PathNotExistException; + import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; + import org.apache.iotdb.db.exception.query.QueryProcessException; + import org.apache.iotdb.db.qp.physical.PhysicalPlan; + import org.apache.iotdb.db.qp.physical.crud.DeletePlan; + import org.apache.iotdb.db.qp.physical.crud.InsertPlan; + import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; + import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; + import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; + import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; + import org.apache.iotdb.db.service.IoTDB; + import org.apache.iotdb.rpc.RpcUtils; + import org.apache.iotdb.rpc.TSStatusCode; + import org.apache.iotdb.service.rpc.thrift.EndPoint; + import org.apache.iotdb.service.rpc.thrift.TSStatus; + import org.apache.thrift.TException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.List; + import java.util.Map; + + /** + * Coordinator of client non-query request + */ + public class Coordinator { + + private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); + + private MetaGroupMember metaGroupMember; + + private String name; + private Node thisNode; + /** + * router calculates the partition groups that a partitioned plan should be sent to + */ + private ClusterPlanRouter router; + + private static final String MSG_MULTIPLE_ERROR = "The following errors occurred when executing " + + "the query, please retry or contact the DBA: "; + + public Coordinator(MetaGroupMember metaGroupMember) { + this.metaGroupMember = metaGroupMember; + this.name = metaGroupMember.getName(); + this.thisNode = metaGroupMember.getThisNode(); + } + + public Coordinator() { + + } + + public void setMetaGroupMember(MetaGroupMember metaGroupMember) { + this.metaGroupMember = metaGroupMember; + this.name = metaGroupMember.getName(); + this.thisNode = metaGroupMember.getThisNode(); + } + + public void setRouter(ClusterPlanRouter router) { + this.router = router; + } + + /** + * Execute a non-query plan. According to the type of the plan, the plan will be executed on all + * nodes (like timeseries deletion) or the nodes that belong to certain groups (like data + * ingestion). + * + * @param plan a non-query plan. + */ + public TSStatus executeNonQueryPlan(PhysicalPlan plan) { + TSStatus result; + long startTime = Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.getOperationStartTime(); + if (PartitionUtils.isLocalNonQueryPlan(plan)) { + // run locally + result = executeNonQueryLocally(plan); + } else if (PartitionUtils.isGlobalMetaPlan(plan)) { + //forward the plan to all meta group nodes + result = metaGroupMember.processNonPartitionedMetaPlan(plan); + } else if (PartitionUtils.isGlobalDataPlan(plan)) { + //forward the plan to all data group nodes + result = processNonPartitionedDataPlan(plan); + } else { + //split the plan and forward them to some PartitionGroups + try { + result = processPartitionedPlan(plan); + } catch (UnsupportedPlanException e) { + return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION, e.getMessage()); + } + } + Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime); + return result; + } + + /** + * execute a non-query plan that is not necessary to be executed on other nodes. + */ + private TSStatus executeNonQueryLocally(PhysicalPlan plan) { + boolean execRet; + try { + execRet = metaGroupMember.getLocalExecutor().processNonQuery(plan); + } catch (QueryProcessException e) { + if (e.getErrorCode() != TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) { + logger.debug("meet error while processing non-query. ", e); + } else { + logger.warn("meet error while processing non-query. ", e); + } + return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); + } catch (Exception e) { + logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } + + return execRet + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully") + : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR); + } + + /** + * A non-partitioned plan (like DeleteData) should be executed on all data group nodes, so the + * DataGroupLeader should take the responsible to make sure that every node receives the plan. + * Thus the plan will be processed locally only by the DataGroupLeader and forwarded by non-leader + * nodes. + */ + private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) { + if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) { + try { + // as delete related plans may have abstract paths (paths with wildcards), we convert + // them to full paths so the executor nodes will not need to query the metadata holders, + // eliminating the risk that when they are querying the metadata holders, the timeseries + // has already been deleted + ((CMManager) IoTDB.metaManager).convertToFullPaths(plan); + } catch (PathNotExistException e) { + if (plan.getPaths().isEmpty()) { + // only reports an error when there is no matching path + return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage()); + } + } + } + try { + metaGroupMember.syncLeaderWithConsistencyCheck(true); + List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups(); + logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size()); + return forwardPlan(globalGroups, plan); + } catch (CheckConsistencyException e) { + logger.debug("Forwarding global data plan {} to meta leader {}", plan, metaGroupMember.getLeader()); + metaGroupMember.waitLeader(); + return metaGroupMember.forwardPlan(plan, metaGroupMember.getLeader(), null); + } + } + + + /** + * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to + * a data group. And these sub-plans will be sent to and executed on the corresponding groups + * separately. + */ + public TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException { + logger.debug("{}: Received a partitioned plan {}", name, plan); + if (metaGroupMember.getPartitionTable() == null) { + logger.debug("{}: Partition table is not ready", name); + return StatusUtils.PARTITION_TABLE_NOT_READY; + } + + // split the plan into sub-plans that each only involve one data group + Map<PhysicalPlan, PartitionGroup> planGroupMap; + try { + planGroupMap = splitPlan(plan); + } catch (CheckConsistencyException checkConsistencyException) { + return StatusUtils + .getStatus(StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage()); + } + + // the storage group is not found locally + if (planGroupMap == null || planGroupMap.isEmpty()) { + if ((plan instanceof InsertPlan + || plan instanceof CreateTimeSeriesPlan + || plan instanceof CreateMultiTimeSeriesPlan) + && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { + logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan); + try { + ((CMManager) IoTDB.metaManager).createSchema(plan); + return processPartitionedPlan(plan); + } catch (MetadataException | CheckConsistencyException e) { + logger.error( + String.format("Failed to set storage group or create timeseries, because %s", e)); + } + } + logger.error("{}: Cannot find storage groups for {}", name, plan); + return StatusUtils.NO_STORAGE_GROUP; + } + logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap); + return forwardPlan(planGroupMap, plan); + } + + /** + * Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be + * returned. The error messages from each group (if any) will be compacted into one string. + * + * @param partitionGroups + * @param plan + */ + private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan plan) { + // the error codes from the groups that cannot execute the plan + TSStatus status; + List<String> errorCodePartitionGroups = new ArrayList<>(); + for (PartitionGroup partitionGroup : partitionGroups) { + if (partitionGroup.contains(thisNode)) { + // the query should be handled by a group the local node is in, handle it with in the group + logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader()); - status = metaGroupMember.getLocalDataMember(partitionGroup.getHeader()) - .executeNonQueryPlan(plan); ++ status = metaGroupMember.getLocalDataMember(new RaftNode(partitionGroup.getHeader(), ++ partitionGroup.getId())).executeNonQueryPlan(plan); + } else { + // forward the query to the group that should handle it + logger.debug("Forward {} to a remote group of {}", plan, + partitionGroup.getHeader()); + status = forwardPlan(plan, partitionGroup); + } + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && ( + !(plan instanceof DeleteTimeSeriesPlan) || + status.getCode() != TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) { + // execution failed, record the error message + errorCodePartitionGroups.add(String.format("[%s@%s:%s]", + status.getCode(), partitionGroup.getHeader(), + status.getMessage())); + } + } + if (errorCodePartitionGroups.isEmpty()) { + status = StatusUtils.OK; + } else { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, + MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString()); + } + logger.debug("{}: executed {} with answer {}", name, plan, status); + return status; + } + + /** + * split a plan into several sub-plans, each belongs to only one data group. + */ + private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan) + throws UnsupportedPlanException, CheckConsistencyException { + Map<PhysicalPlan, PartitionGroup> planGroupMap = null; + try { + planGroupMap = router.splitAndRoutePlan(plan); + } catch (StorageGroupNotSetException e) { + // synchronize with the leader to see if this node has unpulled storage groups + metaGroupMember.syncLeaderWithConsistencyCheck(true); + try { + planGroupMap = router.splitAndRoutePlan(plan); - } catch (MetadataException ex) { ++ } catch (MetadataException | UnknownLogTypeException ex) { + // ignore + } - } catch (MetadataException e) { ++ } catch (MetadataException | UnknownLogTypeException e) { + logger.error("Cannot route plan {}", plan, e); + } + return planGroupMap; + } + + /** + * Forward plans to the DataGroupMember of one node in the corresponding group. Only when all + * nodes time out, will a TIME_OUT be returned. + * + * @param planGroupMap sub-plan -> belong data group pairs + */ + private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) { + // the error codes from the groups that cannot execute the plan + TSStatus status; + if (planGroupMap.size() == 1) { + status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next()); + } else { + if (plan instanceof InsertTabletPlan || plan instanceof CreateMultiTimeSeriesPlan) { + // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, each will correspond to a TSStatus as its + // execution result, as the plan is split and the sub-plans may have interleaving ranges, + // we must assure that each TSStatus is placed to the right position + // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2 + // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the + // failure and success should be placed into proper positions in TSStatus.subStatus + status = forwardMultiSubPlan(planGroupMap, plan); + } else { + status = forwardToMultipleGroup(planGroupMap); + } + } + if (plan instanceof InsertPlan + && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode() + && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { + TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan)); + if (tmpStatus != null) { + status = tmpStatus; + } + } + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status + .isSetRedirectNode()) { + status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); + } + logger.debug("{}: executed {} with answer {}", name, plan, status); + return status; + } + + private TSStatus createTimeseriesForFailedInsertion( + Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) { + // try to create timeseries + if (plan.getFailedMeasurements() != null) { + plan.getPlanFromFailed(); + } + boolean hasCreate; + try { + hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan); + } catch (IllegalPathException | CheckConsistencyException e) { + return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + if (hasCreate) { + return forwardPlan(planGroupMap, plan); + } else { + logger.error("{}, Cannot auto create timeseries.", thisNode); + } + return null; + } + + + private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) { + TSStatus result; + if (entry.getValue().contains(thisNode)) { + // the query should be handled by a group the local node is in, handle it with in the group + long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP + .getOperationStartTime(); + logger.debug("Execute {} in a local group of {}", entry.getKey(), + entry.getValue().getHeader()); - result = metaGroupMember.getLocalDataMember(entry.getValue().getHeader()) ++ result = metaGroupMember.getLocalDataMember(new RaftNode(entry.getValue().getHeader(), entry.getValue().getId())) + .executeNonQueryPlan(entry.getKey()); + Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP + .calOperationCostTimeFromStart(startTime); + } else { + // forward the query to the group that should handle it + long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP + .getOperationStartTime(); + logger.debug("Forward {} to a remote group of {}", entry.getKey(), + entry.getValue().getHeader()); + result = forwardPlan(entry.getKey(), entry.getValue()); + Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP + .calOperationCostTimeFromStart(startTime); + } + return result; + } + + /** + * forward each sub-plan to its corresponding data group, if some groups goes wrong, the error + * messages from each group will be compacted into one string. + * + * @param planGroupMap sub-plan -> data group pairs + */ + private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) { + List<String> errorCodePartitionGroups = new ArrayList<>(); + TSStatus tmpStatus; + boolean allRedirect = true; + EndPoint endPoint = null; + for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) { + tmpStatus = forwardToSingleGroup(entry); + if (tmpStatus.isSetRedirectNode()) { + endPoint = tmpStatus.getRedirectNode(); + } else { + allRedirect = false; + } + if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // execution failed, record the error message + errorCodePartitionGroups.add(String.format("[%s@%s:%s]", + tmpStatus.getCode(), entry.getValue().getHeader(), + tmpStatus.getMessage())); + } + } + TSStatus status; + if (errorCodePartitionGroups.isEmpty()) { - status = StatusUtils.OK; + if (allRedirect) { - status = StatusUtils.getStatus(status, endPoint); ++ status = new TSStatus(); ++ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); ++ } else { ++ status = StatusUtils.OK; + } + } else { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, + MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString()); + } + return status; + } + + + /** + * Forward each sub-plan to its belonging data group, and combine responses from the groups. + * + * @param planGroupMap sub-plan -> data group pairs + */ + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + private TSStatus forwardMultiSubPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, + PhysicalPlan parentPlan) { + List<String> errorCodePartitionGroups = new ArrayList<>(); + TSStatus tmpStatus; + TSStatus[] subStatus = null; + boolean noFailure = true; + boolean isBatchFailure = false; + EndPoint endPoint = null; + int totalRowNum = 0; + // send sub-plans to each belonging data group and collect results + for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) { + tmpStatus = forwardToSingleGroup(entry); + logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus); + noFailure = + (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure; + isBatchFailure = (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) + || isBatchFailure; + if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + if (parentPlan instanceof InsertTabletPlan) { + totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount(); + } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { + totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size(); + } + if (subStatus == null) { + subStatus = new TSStatus[totalRowNum]; + Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); + } + // set the status from one group to the proper positions of the overall status + if (parentPlan instanceof InsertTabletPlan) { + PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus, + tmpStatus.subStatus.toArray(new TSStatus[]{})); + } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { + CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey(); + for (int i = 0; i < subPlan.getIndexes().size(); i++) { + subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i); + } + } + } + if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // execution failed, record the error message + errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]", + tmpStatus.getCode(), entry.getValue().getHeader(), + tmpStatus.getMessage(), tmpStatus.subStatus)); + } + if (parentPlan instanceof InsertTabletPlan && tmpStatus.isSetRedirectNode() && + ((InsertTabletPlan) entry.getKey()).getMaxTime() == ((InsertTabletPlan) parentPlan) + .getMaxTime()) { + endPoint = tmpStatus.getRedirectNode(); + } + } + + if (parentPlan instanceof CreateMultiTimeSeriesPlan && + !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) { + if (subStatus == null) { + subStatus = new TSStatus[totalRowNum]; + Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); + } + noFailure = false; + isBatchFailure = true; + for (Map.Entry<Integer, TSStatus> integerTSStatusEntry : ((CreateMultiTimeSeriesPlan) parentPlan) + .getResults().entrySet()) { + subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue(); + } + } + return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus, + errorCodePartitionGroups); + } + + private TSStatus concludeFinalStatus(boolean noFailure, EndPoint endPoint, + boolean isBatchFailure, TSStatus[] subStatus, + List<String> errorCodePartitionGroups) { + TSStatus status; + if (noFailure) { + status = StatusUtils.OK; + if (endPoint != null) { + status = StatusUtils.getStatus(status, endPoint); + } + } else if (isBatchFailure) { + status = RpcUtils.getStatus(Arrays.asList(subStatus)); + } else { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, + MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString()); + } + return status; + } + + + /** + * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out, + * will a TIME_OUT be returned. + */ + private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) { + for (Node node : group) { + TSStatus status; + try { + // only data plans are partitioned, so it must be processed by its data server instead of + // meta server + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + status = forwardDataPlanAsync(plan, node, group.getHeader()); + } else { + status = forwardDataPlanSync(plan, node, group.getHeader()); + } + } catch (IOException e) { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + if (!StatusUtils.TIME_OUT.equals(status)) { + if (!status.isSetRedirectNode()) { + status.setRedirectNode(new EndPoint(node.getIp(), node.getClientPort())); + } + return status; + } else { + logger.warn("Forward {} to {} timed out", plan, node); + } + } + logger.warn("Forward {} to {} timed out", plan, group); + return StatusUtils.TIME_OUT; + } + + /** + * Forward a non-query plan to the data port of "receiver" + * + * @param plan a non-query plan + * @param header to determine which DataGroupMember of "receiver" will process the request. + * @return a TSStatus indicating if the forwarding is successful. + */ + private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header) + throws IOException { + RaftService.AsyncClient client = metaGroupMember.getClientProvider().getAsyncDataClient(receiver, + RaftServer.getWriteOperationTimeoutMS()); + return this.metaGroupMember.forwardPlanAsync(plan, receiver, header, client); + } + + private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header) + throws IOException { + RaftService.Client client = null; + try { + client = metaGroupMember.getClientProvider().getSyncDataClient(receiver, + RaftServer.getWriteOperationTimeoutMS()); + } catch (TException e) { + throw new IOException(e); + } + return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client); + } + + /** + * Get a thrift client that will connect to "node" using the data port. + * + * @param node the node to be connected + * @param timeout timeout threshold of connection + */ + public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException { + return metaGroupMember.getClientProvider().getAsyncDataClient(node, timeout); + } + + public Node getThisNode() { + return thisNode; + } + + /** + * Get a thrift client that will connect to "node" using the data port. + * + * @param node the node to be connected + * @param timeout timeout threshold of connection + */ + public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException { + return metaGroupMember.getClientProvider().getSyncDataClient(node, timeout); + } + } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java index aaa03a4,d7dd5f9..705000f --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java @@@ -19,7 -19,6 +19,8 @@@ package org.apache.iotdb.cluster.log.applier; +import org.apache.iotdb.cluster.exception.ChangeMembershipException; ++import org.apache.iotdb.cluster.exception.UnsupportedPlanException; import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; @@@ -70,19 -60,11 +71,20 @@@ public class MetaLogApplier extends Bas } else { logger.error("Unsupported log: {} {}", log.getClass().getName(), log); } - } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException e) { - } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) { ++ } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException | UnsupportedPlanException e) { logger.debug("Exception occurred when executing {}", log, e); log.setException(e); } finally { log.setApplied(true); } } + - private void sendLogToAllDataGroups(Log log) throws ChangeMembershipException { ++ private void sendLogToAllDataGroups(Log log) ++ throws ChangeMembershipException, UnsupportedPlanException { + LogPlan plan = new LogPlan(log.serialize()); - TSStatus status = member.executeNonQueryPlan(plan); ++ TSStatus status = member.processPartitionedPlan(plan); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status)); + } + } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java index 1e86e11,ff650e3..c92a482 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java @@@ -85,19 -81,4 +85,20 @@@ public class MetaSingleSnapshotLogManag snapshot.setLastLogTerm(term); return snapshot; } + + @Override + void applyEntries(List<Log> entries) { + for (Log entry : entries) { + if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) { + blockedUnappliedLogList.add(entry); + continue; + } + try { + ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER); + } catch (Exception e) { ++ logger.error("Can not apply log {}", entry, e); + entry.setException(e); + } + } + } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java index c57ee06,c27cf9e..f774567 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java @@@ -114,9 -114,9 +114,9 @@@ public abstract class RaftLogManager * Each time new logs are appended, this condition will be notified so logs that have larger * indices but arrived earlier can proceed. */ - private final Object logUpdateCondition = new Object(); + private final Object[] logUpdateConditions = new Object[1024]; - private List<Log> blockedUnappliedLogList; + protected List<Log> blockedUnappliedLogList; protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name) { this.logApplier = applier; diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java index c1ab3e5,dd63e2f..fe9b8f7 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java @@@ -303,15 -306,13 +307,13 @@@ public class ClientServer extends TSSer GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>()); try { if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { - AsyncDataClient client = metaGroupMember - .getClientProvider().getAsyncDataClient(queriedNode, + AsyncDataClient client = coordinator.getAsyncDataClient(queriedNode, - RaftServer.getReadOperationTimeoutMS()); - client.endQuery(header, coordinator.getThisNode(), queryId, handler); + RaftServer.getReadOperationTimeoutMS()); - client.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId, handler); ++ client.endQuery(header.getNode(), header.getRaftId(), coordinator.getThisNode(), queryId, handler); } else { - SyncDataClient syncDataClient = metaGroupMember - .getClientProvider().getSyncDataClient(queriedNode, + SyncDataClient syncDataClient = coordinator.getSyncDataClient(queriedNode, - RaftServer.getReadOperationTimeoutMS()); - syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); + RaftServer.getReadOperationTimeoutMS()); - syncDataClient.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId); ++ syncDataClient.endQuery(header.getNode(), header.getRaftId(), coordinator.getThisNode(), queryId); } } catch (IOException | TException e) { logger.error("Cannot end query {} in {}", queryId, queriedNode); diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index d198039,8df621d..f97d181 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java @@@ -339,8 -344,18 +344,18 @@@ public class MetaClusterServer extends } @Override - public void removeHardLink(String hardLinkPath, + public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) { - asyncService.removeHardLink(hardLinkPath, resultHandler); + asyncService.removeHardLink(hardLinkPath, raftId, resultHandler); } + + @Override + public void handshake(Node sender) { + syncService.handshake(sender); + } + + @Override + public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) { + asyncService.handshake(sender, resultHandler); + } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index df45b53,f8811e9..7b4f663 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@@ -75,15 -75,15 +75,16 @@@ import org.apache.iotdb.cluster.rpc.thr import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; import org.apache.iotdb.cluster.server.NodeCharacter; - import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport; - import org.apache.iotdb.cluster.server.Peer; + import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport; + import org.apache.iotdb.cluster.server.monitor.NodeStatusManager; + import org.apache.iotdb.cluster.server.monitor.Peer; import org.apache.iotdb.cluster.server.PullSnapshotHintService; import org.apache.iotdb.cluster.server.Response; - import org.apache.iotdb.cluster.server.Timer; - import org.apache.iotdb.cluster.server.Timer.Statistic; + import org.apache.iotdb.cluster.server.monitor.Timer; + import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@@ -824,8 -768,8 +825,8 @@@ public class DataGroupMember extends Ra lastReportedLogIndex = logManager.getLastLogIndex(); return new DataMemberReport(character, leader.get(), term.get(), logManager.getLastLogTerm(), lastReportedLogIndex, logManager.getCommitLogIndex(), - logManager.getCommitLogTerm(), getHeader(), readOnly, + logManager.getCommitLogTerm(), getHeader(), getRaftGroupId(), readOnly, - QueryCoordinator.getINSTANCE() + NodeStatusManager.getINSTANCE() .getLastResponseLatency(getHeader()), lastHeartbeatReceivedTime, prevLastLogIndex, logManager.getMaxHaveAppliedCommitIndex()); } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 1534323,74b9d3b..5484bb5 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@@ -19,42 -19,6 +19,7 @@@ package org.apache.iotdb.cluster.server.member; - import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC; - import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult; - - import java.io.BufferedInputStream; - import java.io.BufferedOutputStream; - import java.io.BufferedReader; - import java.io.BufferedWriter; - import java.io.DataInputStream; - import java.io.DataOutputStream; - import java.io.File; - import java.io.FileInputStream; - import java.io.FileOutputStream; - import java.io.FileReader; - import java.io.FileWriter; - import java.io.IOException; - import java.nio.ByteBuffer; - import java.nio.file.Files; - import java.nio.file.Paths; - import java.util.ArrayList; - import java.util.Arrays; - import java.util.Collection; - import java.util.HashMap; - import java.util.HashSet; - import java.util.List; - import java.util.Map; +import java.util.Map.Entry; - import java.util.Objects; - import java.util.Set; - import java.util.concurrent.ExecutorService; - import java.util.concurrent.Executors; - import java.util.concurrent.ScheduledExecutorService; - import java.util.concurrent.ScheduledThreadPoolExecutor; - import java.util.concurrent.TimeUnit; - import java.util.concurrent.atomic.AtomicBoolean; - import java.util.concurrent.atomic.AtomicInteger; - import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.cluster.client.DataClientProvider; import org.apache.iotdb.cluster.client.async.AsyncClientPool; import org.apache.iotdb.cluster.client.async.AsyncMetaClient; @@@ -65,17 -29,15 +30,17 @@@ import org.apache.iotdb.cluster.client. import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient; import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.config.ClusterDescriptor; + import org.apache.iotdb.cluster.coordinator.Coordinator; import org.apache.iotdb.cluster.exception.AddSelfException; +import org.apache.iotdb.cluster.exception.CheckConsistencyException; import org.apache.iotdb.cluster.exception.ConfigInconsistentException; import org.apache.iotdb.cluster.exception.EmptyIntervalException; import org.apache.iotdb.cluster.exception.LogExecutionException; import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException; import org.apache.iotdb.cluster.exception.SnapshotInstallationException; import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; -import org.apache.iotdb.cluster.log.Log; +import org.apache.iotdb.cluster.exception.UnknownLogTypeException; +import org.apache.iotdb.cluster.exception.UnsupportedPlanException; - import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.LogApplier; import org.apache.iotdb.cluster.log.applier.MetaLogApplier; import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; @@@ -89,9 -50,8 +54,7 @@@ import org.apache.iotdb.cluster.partiti import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.query.ClusterPlanRouter; - import org.apache.iotdb.cluster.query.manage.QueryCoordinator; import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; --import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; @@@ -107,12 -64,9 +70,8 @@@ import org.apache.iotdb.cluster.server. import org.apache.iotdb.cluster.server.DataClusterServer; import org.apache.iotdb.cluster.server.HardLinkCleaner; import org.apache.iotdb.cluster.server.NodeCharacter; - import org.apache.iotdb.cluster.server.NodeReport; - import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.Response; - import org.apache.iotdb.cluster.server.Timer; --import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler; import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer; @@@ -155,6 -101,42 +117,40 @@@ import org.apache.thrift.transport.TTra import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.BufferedInputStream; + import java.io.BufferedOutputStream; + import java.io.BufferedReader; + import java.io.BufferedWriter; + import java.io.DataInputStream; + import java.io.DataOutputStream; + import java.io.File; + import java.io.FileInputStream; + import java.io.FileOutputStream; + import java.io.FileReader; + import java.io.FileWriter; + import java.io.IOException; + import java.nio.ByteBuffer; + import java.nio.file.Files; + import java.nio.file.Paths; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ScheduledThreadPoolExecutor; + import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; + + import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC; + import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult; + @SuppressWarnings("java:S1135") public class MetaGroupMember extends RaftMember { @@@ -172,9 -154,7 +168,10 @@@ * in case of data loss, some file changes would be made to a temporary file first */ private static final String TEMP_SUFFIX = ".tmp"; + + private static final String MSG_MULTIPLE_ERROR = "The following errors occurred when executing " + + "the query, please retry or contact the DBA: "; + private static final Logger logger = LoggerFactory.getLogger(MetaGroupMember.class); /** * when joining a cluster this node will retry at most "DEFAULT_JOIN_RETRY" times before returning @@@ -265,12 -264,12 +276,12 @@@ public MetaGroupMember() { } - public MetaGroupMember(TProtocolFactory factory, Node thisNode) throws QueryProcessException { + public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator coordinator) throws QueryProcessException { super("Meta", new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)), new SyncClientPool(new SyncMetaClient.FactorySync(factory)), - new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false), + new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)), new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory))); - allNodes = new ArrayList<>(); + allNodes = new PartitionGroup(); initPeerMap(); dataClientProvider = new DataClientProvider(factory); @@@ -1308,415 -1402,6 +1319,415 @@@ } /** + * A non-partitioned plan (like DeleteData) should be executed on all data group nodes, so the + * DataGroupLeader should take the responsible to make sure that every node receives the plan. + * Thus the plan will be processed locally only by the DataGroupLeader and forwarded by non-leader + * nodes. + */ + private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) { + if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) { + try { + // as delete related plans may have abstract paths (paths with wildcards), we convert + // them to full paths so the executor nodes will not need to query the metadata holders, + // eliminating the risk that when they are querying the metadata holders, the timeseries + // has already been deleted + ((CMManager) IoTDB.metaManager).convertToFullPaths(plan); + } catch (PathNotExistException e) { + if (plan.getPaths().isEmpty()) { + // only reports an error when there is no matching path + return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage()); + } + } + } + try { + syncLeaderWithConsistencyCheck(true); + List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups(); + logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size()); + return forwardPlan(globalGroups, plan); + } catch (CheckConsistencyException e) { + logger.debug("Forwarding global data plan {} to meta leader {}", plan, leader.get()); + waitLeader(); + return forwardPlan(plan, leader.get(), null); + } + } + + /** + * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to + * a data group. And these sub-plans will be sent to and executed on the corresponding groups + * separately. + */ + public TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException { + logger.debug("{}: Received a partitioned plan {}", name, plan); + if (partitionTable == null) { + logger.debug("{}: Partition table is not ready", name); + return StatusUtils.PARTITION_TABLE_NOT_READY; + } + + // split the plan into sub-plans that each only involve one data group + Map<PhysicalPlan, PartitionGroup> planGroupMap; + try { + planGroupMap = splitPlan(plan); + } catch (CheckConsistencyException checkConsistencyException) { + return StatusUtils + .getStatus(StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage()); + } + + // the storage group is not found locally + if (planGroupMap == null || planGroupMap.isEmpty()) { + if ((plan instanceof InsertPlan || plan instanceof CreateTimeSeriesPlan + || plan instanceof CreateMultiTimeSeriesPlan) + && config.isEnableAutoCreateSchema()) { + logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan); + try { + ((CMManager) IoTDB.metaManager).createSchema(plan); + return processPartitionedPlan(plan); + } catch (MetadataException | CheckConsistencyException e) { + logger.error( + String.format("Failed to set storage group or create timeseries, because %s", e)); + } + } + logger.error("{}: Cannot find storage groups for {}", name, plan); + return StatusUtils.NO_STORAGE_GROUP; + } + logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap); + return forwardPlan(planGroupMap, plan); + } + + /** + * split a plan into several sub-plans, each belongs to only one data group. + */ + private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan) + throws UnsupportedPlanException, CheckConsistencyException { + Map<PhysicalPlan, PartitionGroup> planGroupMap = null; + try { + planGroupMap = router.splitAndRoutePlan(plan); + } catch (StorageGroupNotSetException e) { + // synchronize with the leader to see if this node has unpulled storage groups + syncLeaderWithConsistencyCheck(true); + try { + planGroupMap = router.splitAndRoutePlan(plan); + } catch (MetadataException | UnknownLogTypeException ex) { + // ignore + } + } catch (MetadataException | UnknownLogTypeException e) { + logger.error("Cannot route plan {}", plan, e); + } + return planGroupMap; + } + + /** + * Forward plans to the DataGroupMember of one node in the corresponding group. Only when all + * nodes time out, will a TIME_OUT be returned. + * + * @param planGroupMap sub-plan -> belong data group pairs + */ + public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) { + // the error codes from the groups that cannot execute the plan + TSStatus status; + if (planGroupMap.size() == 1) { + status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next()); + } else { + if (plan instanceof InsertTabletPlan || plan instanceof CreateMultiTimeSeriesPlan) { + // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, each will correspond to a TSStatus as its + // execution result, as the plan is split and the sub-plans may have interleaving ranges, + // we must assure that each TSStatus is placed to the right position + // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2 + // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the + // failure and success should be placed into proper positions in TSStatus.subStatus + status = forwardMultiSubPlan(planGroupMap, plan); + } else { + status = forwardToMultipleGroup(planGroupMap); + } + } + if (plan instanceof InsertPlan + && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode() + && config.isEnableAutoCreateSchema()) { + TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan)); + if (tmpStatus != null) { + status = tmpStatus; + } + } - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status - .isSetRedirectNode()) { - status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); - } + logger.debug("{}: executed {} with answer {}", name, plan, status); + return status; + } + + private TSStatus createTimeseriesForFailedInsertion( + Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) { + // try to create timeseries + if (plan.getFailedMeasurements() != null) { + plan.getPlanFromFailed(); + } + boolean hasCreate; + try { + hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan); + } catch (IllegalPathException | CheckConsistencyException e) { + return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + if (hasCreate) { + return forwardPlan(planGroupMap, plan); + } else { + logger.error("{}, Cannot auto create timeseries.", thisNode); + } + return null; + } + + /** + * Forward each sub-plan to its belonging data group, and combine responses from the groups. + * + * @param planGroupMap sub-plan -> data group pairs + */ + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + private TSStatus forwardMultiSubPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, + PhysicalPlan parentPlan) { + List<String> errorCodePartitionGroups = new ArrayList<>(); + TSStatus tmpStatus; + TSStatus[] subStatus = null; + boolean noFailure = true; + boolean isBatchFailure = false; + EndPoint endPoint = null; + int totalRowNum = 0; + // send sub-plans to each belonging data group and collect results + for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) { + tmpStatus = forwardToSingleGroup(entry); + logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus); + noFailure = + (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure; + isBatchFailure = (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) + || isBatchFailure; + if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + if (parentPlan instanceof InsertTabletPlan) { + totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount(); + } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { + totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size(); + } + if (subStatus == null) { + subStatus = new TSStatus[totalRowNum]; + Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); + } + // set the status from one group to the proper positions of the overall status + if (parentPlan instanceof InsertTabletPlan) { + PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus, + tmpStatus.subStatus.toArray(new TSStatus[]{})); + } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { + CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey(); + for (int i = 0; i < subPlan.getIndexes().size(); i++) { + subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i); + } + } + } + if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // execution failed, record the error message + errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]", + tmpStatus.getCode(), entry.getValue().getHeader(), + tmpStatus.getMessage(), tmpStatus.subStatus)); + } + if (parentPlan instanceof InsertTabletPlan && tmpStatus.isSetRedirectNode() && + ((InsertTabletPlan) entry.getKey()).getMaxTime() == ((InsertTabletPlan) parentPlan) + .getMaxTime()) { + endPoint = tmpStatus.getRedirectNode(); + } + } + + if (parentPlan instanceof CreateMultiTimeSeriesPlan && + !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) { + if (subStatus == null) { + subStatus = new TSStatus[totalRowNum]; + Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS); + } + noFailure = false; + isBatchFailure = true; + for (Entry<Integer, TSStatus> integerTSStatusEntry : ((CreateMultiTimeSeriesPlan) parentPlan) + .getResults().entrySet()) { + subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue(); + } + } + return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus, + errorCodePartitionGroups); + } + + private TSStatus concludeFinalStatus(boolean noFailure, EndPoint endPoint, + boolean isBatchFailure, TSStatus[] subStatus, List<String> errorCodePartitionGroups) { + TSStatus status; + if (noFailure) { + status = StatusUtils.OK; + if (endPoint != null) { + status = StatusUtils.getStatus(status, endPoint); + } + } else if (isBatchFailure) { + status = RpcUtils.getStatus(Arrays.asList(subStatus)); + } else { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, + MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString()); + } + return status; + } + + private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) { + TSStatus result; + if (entry.getValue().contains(thisNode)) { + // the query should be handled by a group the local node is in, handle it with in the group + long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP + .getOperationStartTime(); + logger.debug("Execute {} in a local group of {}", entry.getKey(), + entry.getValue().getHeader()); + result = getLocalDataMember(entry.getValue().getHeader(), entry.getValue().getId()) + .executeNonQueryPlan(entry.getKey()); + Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP + .calOperationCostTimeFromStart(startTime); + } else { + // forward the query to the group that should handle it + long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP + .getOperationStartTime(); + logger.debug("Forward {} to a remote group of {}", entry.getKey(), + entry.getValue().getHeader()); + result = forwardPlan(entry.getKey(), entry.getValue()); + Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP + .calOperationCostTimeFromStart(startTime); + } + return result; + } + + /** + * forward each sub-plan to its corresponding data group, if some groups goes wrong, the error + * messages from each group will be compacted into one string. + * + * @param planGroupMap sub-plan -> data group pairs + */ + private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) { + List<String> errorCodePartitionGroups = new ArrayList<>(); + TSStatus tmpStatus; + boolean allRedirect = true; + EndPoint endPoint = null; + for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) { + tmpStatus = forwardToSingleGroup(entry); + if (tmpStatus.isSetRedirectNode()) { + endPoint = tmpStatus.getRedirectNode(); + } else { + allRedirect = false; + } + if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // execution failed, record the error message + errorCodePartitionGroups.add(String.format("[%s@%s:%s]", + tmpStatus.getCode(), entry.getValue().getHeader(), + tmpStatus.getMessage())); + } + } + TSStatus status; + if (errorCodePartitionGroups.isEmpty()) { - status = StatusUtils.OK; + if (allRedirect) { - status = StatusUtils.getStatus(status, endPoint); ++ status = new TSStatus(); ++ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); ++ } else { ++ status = StatusUtils.OK; + } + } else { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, + MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString()); + } + return status; + } + + /** + * Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be + * returned. The error messages from each group (if any) will be compacted into one string. + * + * @para plan + */ + private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan plan) { + // the error codes from the groups that cannot execute the plan + TSStatus status; + List<String> errorCodePartitionGroups = new ArrayList<>(); + for (PartitionGroup partitionGroup : partitionGroups) { + if (partitionGroup.contains(thisNode)) { + // the query should be handled by a group the local node is in, handle it with in the group + logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader()); + status = getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()) + .executeNonQueryPlan(plan); + } else { + // forward the query to the group that should handle it + logger.debug("Forward {} to a remote group of {}", plan, + partitionGroup.getHeader()); + status = forwardPlan(plan, partitionGroup); + } + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && ( + !(plan instanceof DeleteTimeSeriesPlan) || + status.getCode() != TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) { + // execution failed, record the error message + errorCodePartitionGroups.add(String.format("[%s@%s:%s]", + status.getCode(), partitionGroup.getHeader(), + status.getMessage())); + } + } + if (errorCodePartitionGroups.isEmpty()) { + status = StatusUtils.OK; + } else { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, + MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString()); + } + logger.debug("{}: executed {} with answer {}", name, plan, status); + return status; + } + + /** + * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out, + * will a TIME_OUT be returned. + */ + private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) { + for (Node node : group) { + TSStatus status; + try { + // only data plans are partitioned, so it must be processed by its data server instead of + // meta server + if (config.isUseAsyncServer()) { + status = forwardDataPlanAsync(plan, node, group.getHeader()); + } else { + status = forwardDataPlanSync(plan, node, group.getHeader()); + } + } catch (IOException e) { + status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage()); + } + if (!StatusUtils.TIME_OUT.equals(status)) { + if (!status.isSetRedirectNode()) { + status.setRedirectNode(new EndPoint(node.getIp(), node.getClientPort())); + } + return status; + } else { + logger.warn("Forward {} to {} timed out", plan, node); + } + } + logger.warn("Forward {} to {} timed out", plan, group); + return StatusUtils.TIME_OUT; + } + + /** + * Forward a non-query plan to the data port of "receiver" + * + * @param plan a non-query plan + * @param header to determine which DataGroupMember of "receiver" will process the request. + * @return a TSStatus indicating if the forwarding is successful. + */ + private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header) + throws IOException { + RaftService.AsyncClient client = getClientProvider().getAsyncDataClient(receiver, + RaftServer.getWriteOperationTimeoutMS()); + return forwardPlanAsync(plan, receiver, header, client); + } + + private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header) + throws IOException { + Client client = null; + try { + client = getClientProvider().getSyncDataClient(receiver, + RaftServer.getWriteOperationTimeoutMS()); + } catch (TException e) { + throw new IOException(e); + } + return forwardPlanSync(plan, receiver, header, client); + } + + /** ++======= ++>>>>>>> master * Get the data groups that should be queried when querying "path" with "filter". First, the time * interval qualified by the filter will be extracted. If any side of the interval is open, query * all groups. Otherwise compute all involved groups w.r.t. the time partitioning. @@@ -2096,7 -1764,7 +2108,11 @@@ this.dataClientProvider = dataClientProvider; } + public void setRouter(ClusterPlanRouter router) { + this.router = router; + } ++ + public void handleHandshake(Node sender) { + NodeStatusManager.getINSTANCE().activate(sender); + } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 121892b,fc83ba4..d05ddd9 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@@ -85,6 -84,6 +85,7 @@@ import org.apache.iotdb.cluster.utils.C import org.apache.iotdb.cluster.utils.IOUtils; import org.apache.iotdb.cluster.utils.PlanSerializer; import org.apache.iotdb.cluster.utils.StatusUtils; ++import org.apache.iotdb.cluster.utils.nodetool.function.Status; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.IoTDBException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; @@@ -932,6 -876,6 +907,7 @@@ public abstract class RaftMember try { if (appendLogInGroup(log)) { ++ TSStatus res = StatusUtils.OK; return StatusUtils.OK; } } catch (LogExecutionException e) { @@@ -1221,9 -1165,9 +1197,9 @@@ return forwardPlanAsync(plan, receiver, header, client); } - TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, Node header, AsyncClient client) { + public TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, Node header, AsyncClient client) { try { - TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, header, receiver); + TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, header, receiver, getRaftGroupId()); if (tsStatus == null) { tsStatus = StatusUtils.TIME_OUT; logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver); @@@ -1249,10 -1193,9 +1225,10 @@@ return forwardPlanSync(plan, receiver, header, client); } - TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header, Client client) { + public TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header, Client client) { try { ExecutNonQueryReq req = new ExecutNonQueryReq(); + req.setRaftId(getRaftGroupId()); req.setPlanBytes(PlanSerializer.getInstance().serialize(plan)); if (header != null) { req.setHeader(header); diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java index 3b2df98,114aa5a..a555e17 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java @@@ -197,10 -195,14 +197,16 @@@ public class MetaAsyncService extends B * @param resultHandler */ @Override - public void exile(AsyncMethodCallback<Void> resultHandler) { - metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode()); + public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) { + RemoveNodeLog removeNodeLog = new RemoveNodeLog(); + removeNodeLog.deserialize(removeNodeLogBuffer); + metaGroupMember.applyRemoveNode(removeNodeLog); resultHandler.onComplete(null); } + + @Override + public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) { + metaGroupMember.handleHandshake(sender); + resultHandler.onComplete(null); + } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java index 48c0e58,ec1519a..d88f4f5 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java @@@ -190,9 -188,12 +190,14 @@@ public class MetaSyncService extends Ba * must tell it directly. */ @Override - public void exile() { - metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode()); + public void exile(ByteBuffer removeNodeLogBuffer) { + RemoveNodeLog removeNodeLog = new RemoveNodeLog(); + removeNodeLog.deserialize(removeNodeLogBuffer); + metaGroupMember.applyRemoveNode(removeNodeLog); } + + @Override + public void handshake(Node sender) { + metaGroupMember.handleHandshake(sender); + } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java index a0e5f56,28922ad..890b402 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java @@@ -28,9 -27,8 +28,9 @@@ import org.apache.iotdb.cluster.partiti import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.server.MetaClusterServer; - import org.apache.iotdb.cluster.server.Timer; + import org.apache.iotdb.cluster.server.monitor.Timer; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.exception.StartupException; diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index fbffb7f,6cc5418..65bb031 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@@ -988,70 -971,74 +993,74 @@@ public class DataGroupMemberTest extend request.setTimeFilterBytes(SerializeUtils.serializeFilter(timeFilter)); QueryContext queryContext = new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)); - request.setQueryId(queryContext.getQueryId()); - request.setRequestor(TestUtils.getNode(0)); - request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal()); - request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0))); - request.setAscending(true); - - DataGroupMember dataGroupMember; - AtomicReference<Long> resultRef; - GenericHandler<Long> handler; - Long executorId; - AtomicReference<List<ByteBuffer>> aggrResultRef; - GenericHandler<List<ByteBuffer>> aggrResultHandler; - List<ByteBuffer> byteBuffers; - List<AggregateResult> aggregateResults; - Object[] answers; - // get an executor from a node holding this timeseries - request.setHeader(TestUtils.getNode(10)); - dataGroupMember = getDataGroupMember(TestUtils.getNode(10)); try { - resultRef = new AtomicReference<>(); - handler = new GenericHandler<>(TestUtils.getNode(0), resultRef); - new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler); - executorId = resultRef.get(); - assertEquals(1L, (long) executorId); - - // fetch result - aggrResultRef = new AtomicReference<>(); - aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef); - new DataAsyncService(dataGroupMember) - .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler); - - byteBuffers = aggrResultRef.get(); - assertNotNull(byteBuffers); - aggregateResults = new ArrayList<>(); - for (ByteBuffer byteBuffer : byteBuffers) { - aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer)); + request.setQueryId(queryContext.getQueryId()); + request.setRequestor(TestUtils.getNode(0)); + request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal()); + request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0))); + request.setAscending(true); + + DataGroupMember dataGroupMember; + AtomicReference<Long> resultRef; + GenericHandler<Long> handler; + Long executorId; + AtomicReference<List<ByteBuffer>> aggrResultRef; + GenericHandler<List<ByteBuffer>> aggrResultHandler; + List<ByteBuffer> byteBuffers; + List<AggregateResult> aggregateResults; + Object[] answers; + // get an executor from a node holding this timeseries + request.setHeader(TestUtils.getNode(10)); + dataGroupMember = getDataGroupMember(TestUtils.getNode(10)); + try { + resultRef = new AtomicReference<>(); + handler = new GenericHandler<>(TestUtils.getNode(0), resultRef); + new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler); + executorId = resultRef.get(); + assertEquals(1L, (long) executorId); + + // fetch result + aggrResultRef = new AtomicReference<>(); + aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef); + new DataAsyncService(dataGroupMember) - .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler); ++ .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler); + + byteBuffers = aggrResultRef.get(); + assertNotNull(byteBuffers); + aggregateResults = new ArrayList<>(); + for (ByteBuffer byteBuffer : byteBuffers) { + aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer)); + } + answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0}; + checkAggregates(answers, aggregateResults); + } finally { + dataGroupMember.closeLogManager(); } - answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0}; - checkAggregates(answers, aggregateResults); - } finally { - dataGroupMember.closeLogManager(); - } - // get an executor from a node not holding this timeseries - request.setHeader(TestUtils.getNode(30)); - dataGroupMember = getDataGroupMember(TestUtils.getNode(30)); - try { - resultRef = new AtomicReference<>(); - handler = new GenericHandler<>(TestUtils.getNode(0), resultRef); - request.timeFilterBytes.position(0); - new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler); - executorId = resultRef.get(); - assertEquals(-1L, (long) executorId); - - // fetch result - aggrResultRef = new AtomicReference<>(); - aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef); - new DataAsyncService(dataGroupMember) - .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler); - - byteBuffers = aggrResultRef.get(); - assertNull(byteBuffers); + // get an executor from a node not holding this timeseries + request.setHeader(TestUtils.getNode(30)); + dataGroupMember = getDataGroupMember(TestUtils.getNode(30)); + try { + resultRef = new AtomicReference<>(); + handler = new GenericHandler<>(TestUtils.getNode(0), resultRef); + request.timeFilterBytes.position(0); + new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler); + executorId = resultRef.get(); + assertEquals(-1L, (long) executorId); + + // fetch result + aggrResultRef = new AtomicReference<>(); + aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef); + new DataAsyncService(dataGroupMember) - .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler); ++ .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler); + + byteBuffers = aggrResultRef.get(); + assertNull(byteBuffers); + } finally { + dataGroupMember.closeLogManager(); + } } finally { - dataGroupMember.closeLogManager(); + QueryResourceManager.getInstance().endQuery(queryContext.getQueryId()); } } diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java index 8c1beef,e2a5767..d5a6515 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java @@@ -100,8 -100,11 +105,10 @@@ public class MemberTest prevUseAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(); preLogBufferSize = ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize(); ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true); - ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(4096); testThreadPool = Executors.newFixedThreadPool(4); prevLeaderWait = RaftMember.getWaitLeaderTimeMs(); + prevEnableWAL = IoTDBDescriptor.getInstance().getConfig().isEnableWal(); + IoTDBDescriptor.getInstance().getConfig().setEnableWal(false); RaftMember.setWaitLeaderTimeMs(10); allNodes = new PartitionGroup(); diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 94f5067,fe0adfc..12fe37e --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@@ -73,10 -67,8 +74,9 @@@ import org.apache.iotdb.cluster.metadat import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; +import org.apache.iotdb.cluster.query.ClusterPlanRouter; import org.apache.iotdb.cluster.query.LocalQueryExecutor; import org.apache.iotdb.cluster.query.RemoteQueryContext; - import org.apache.iotdb.cluster.query.manage.QueryCoordinator; import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory; import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; @@@ -173,6 -163,6 +174,13 @@@ public class MetaGroupMemberTest extend RaftServer.setReadOperationTimeoutMS(1000); super.setUp(); ++ partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)) { ++ @Override ++ public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) { ++ return new RaftNode(TestUtils.getNode(0), 0); ++ } ++ }; ++ testMetaMember.setPartitionTable(partitionTable); dummyResponse.set(Response.RESPONSE_AGREE); testMetaMember.setAllNodes(allNodes); @@@ -186,14 -176,8 +194,14 @@@ buildDataGroups(dataClusterServer); testMetaMember.getThisNode().setNodeIdentifier(0); + testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()){ + @Override + protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) { + return Collections.singletonMap(plan, partitionTable.getHeaderGroup(testMetaMember.getThisNode())); + } + }); mockDataClusterServer = false; - QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember); + NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember); exiledNode = null; System.out.println("Init term of metaGroupMember: " + testMetaMember.getTerm().get()); } @@@ -307,9 -286,8 +315,9 @@@ return resp; } + @Override protected MetaGroupMember getMetaGroupMember(Node node) throws QueryProcessException { - MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node) { + MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node, new Coordinator()) { @Override public DataClusterServer getDataClusterServer() { @@@ -893,7 -914,7 +949,8 @@@ public void testProcessValidHeartbeatReq() throws QueryProcessException { System.out.println("Start testProcessValidHeartbeatReq()"); MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10)); + partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)); + testMetaMember.setCoordinator(new Coordinator()); try { HeartBeatRequest request = new HeartBeatRequest(); request.setRequireIdentifier(true); diff --cc server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index b03f984,30d5d52..c9d130e --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@@ -500,10 -510,12 +510,10 @@@ public class StorageEngine implements I * @throws StorageGroupNotSetException */ public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partitionId, - boolean isSeq, - boolean isSync) - throws StorageGroupNotSetException { + boolean isSeq, boolean isSync) throws StorageGroupNotSetException { StorageGroupProcessor processor = processorMap.get(storageGroupPath); if (processor == null) { - throw new StorageGroupNotSetException(storageGroupPath.getFullPath()); + throw new StorageGroupNotSetException(storageGroupPath.getFullPath(), true); } logger.info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}", diff --cc server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java index aa31a4e,e733813..8928110 --- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java @@@ -114,115 -115,65 +115,66 @@@ public class MLogWriter implements Auto } } - public synchronized void createTimeseries(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException { - try { - putLog(createTimeSeriesPlan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); - } + public void createTimeseries(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException { + putLog(createTimeSeriesPlan); } - public synchronized void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException { - try { - putLog(deleteTimeSeriesPlan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); - } + public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException { + putLog(deleteTimeSeriesPlan); } - public synchronized void setStorageGroup(PartialPath storageGroup) throws IOException { - try { - SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); - } + public void setStorageGroup(PartialPath storageGroup) throws IOException { + SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup); + putLog(plan); } - public synchronized void deleteStorageGroup(PartialPath storageGroup) throws IOException { - try { - DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup)); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); - } + public void deleteStorageGroup(PartialPath storageGroup) throws IOException { + DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup)); + putLog(plan); } - public synchronized void setTTL(PartialPath storageGroup, long ttl) throws IOException { - try { - SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); - } + public void setTTL(PartialPath storageGroup, long ttl) throws IOException { + SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl); + putLog(plan); } - public synchronized void changeOffset(PartialPath path, long offset) throws IOException { - try { - ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); - } + public void changeOffset(PartialPath path, long offset) throws IOException { + ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset); + putLog(plan); } - public synchronized void changeAlias(PartialPath path, String alias) throws IOException { - try { - ChangeAliasPlan plan = new ChangeAliasPlan(path, alias); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); - } + public void changeAlias(PartialPath path, String alias) throws IOException { + ChangeAliasPlan plan = new ChangeAliasPlan(path, alias); + putLog(plan); } - public synchronized void serializeMNode(MNode node) throws IOException { - try { - int childSize = 0; - if (node.getChildren() != null) { - childSize = node.getChildren().size(); - } - MNodePlan plan = new MNodePlan(node.getName(), childSize); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); + public void serializeMNode(MNode node) throws IOException { + int childSize = 0; + if (node.getChildren() != null) { + childSize = node.getChildren().size(); } + MNodePlan plan = new MNodePlan(node.getName(), childSize); + putLog(plan); } - public synchronized void serializeMeasurementMNode(MeasurementMNode node) throws IOException { - try { - int childSize = 0; - if (node.getChildren() != null) { - childSize = node.getChildren().size(); - } - MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(), - node.getOffset(), childSize, node.getSchema()); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); ++ + public void serializeMeasurementMNode(MeasurementMNode node) throws IOException { + int childSize = 0; + if (node.getChildren() != null) { + childSize = node.getChildren().size(); } + MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(), + node.getOffset(), childSize, node.getSchema()); + putLog(plan); } - public synchronized void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException { - try { - int childSize = 0; - if (node.getChildren() != null) { - childSize = node.getChildren().size(); - } - StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize); - putLog(plan); - } catch (BufferOverflowException e) { - throw new IOException( - LOG_TOO_LARGE_INFO, e); + public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException { + int childSize = 0; + if (node.getChildren() != null) { + childSize = node.getChildren().size(); } + StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize); + putLog(plan); } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning diff --cc server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index 0d64b30,e9bf655..dc553be --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@@ -366,7 -361,8 +366,8 @@@ public abstract class PhysicalPlan REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER, DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES, ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX, - CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG - CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, ++ CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG, + BATCH_INSERT_ONE_DEVICE } public long getIndex() {
