This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch cluster- in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 06cbe01bb9d42c47fbf0dc655218dcc3dd5dc8c8 Merge: 9fdbbad 6dc45c6 Author: xiangdong huang <[email protected]> AuthorDate: Thu Sep 23 19:43:01 2021 +0800 merge with master .github/workflows/main-unix.yml | 2 +- .github/workflows/main-win.yml | 2 +- README.md | 2 +- README_ZH.md | 2 +- RELEASE_NOTES.md | 2 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 20 +- checkstyle.xml | 2 +- cli/pom.xml | 5 + .../java/org/apache/iotdb/cli/AbstractCli.java | 17 +- cli/src/main/java/org/apache/iotdb/cli/Cli.java | 3 +- cli/src/main/java/org/apache/iotdb/cli/WinCli.java | 13 +- .../org/apache/iotdb/tool/AbstractCsvTool.java | 96 ++- .../main/java/org/apache/iotdb/tool/ExportCsv.java | 240 +++---- .../main/java/org/apache/iotdb/tool/ImportCsv.java | 789 +++++++++++++-------- .../java/org/apache/iotdb/cli/AbstractScript.java | 10 +- .../org/apache/iotdb/cli/StartClientScriptIT.java | 27 + .../tool/{ => integration}/ExportCsvTestIT.java | 2 +- .../tool/{ => integration}/ImportCsvTestIT.java | 2 +- .../apache/iotdb/tool/unit/WriteCsvFileTestUT.java | 46 ++ client-cpp/pom.xml | 18 + client-cpp/src/main/Session.cpp | 12 +- client-cpp/src/main/Session.h | 4 +- .../resources/conf/iotdb-cluster.properties | 13 +- .../client/sync/SyncDataHeartbeatClient.java | 2 +- .../client/sync/SyncMetaHeartbeatClient.java | 3 +- .../apache/iotdb/cluster/config/ClusterConfig.java | 20 + .../iotdb/cluster/config/ClusterConstant.java | 61 +- .../iotdb/cluster/config/ClusterDescriptor.java | 10 + .../iotdb/cluster/log/applier/BaseApplier.java | 94 ++- .../iotdb/cluster/log/applier/DataLogApplier.java | 66 +- .../manage/FilePartitionedSnapshotLogManager.java | 12 +- .../log/manage/PartitionedSnapshotLogManager.java | 5 +- .../iotdb/cluster/log/manage/RaftLogManager.java | 115 +-- .../serializable/SyncLogDequeSerializer.java | 9 +- .../apache/iotdb/cluster/metadata/CMManager.java | 79 +-- .../partition/slot/SlotTimePartitionFilter.java | 55 ++ .../cluster/query/ClusterDataQueryExecutor.java | 2 +- .../iotdb/cluster/query/ClusterPlanExecutor.java | 23 +- .../iotdb/cluster/query/ClusterPlanRouter.java | 36 + .../iotdb/cluster/query/ClusterQueryRouter.java | 14 +- .../iotdb/cluster/query/LocalQueryExecutor.java | 6 +- .../query/dataset/ClusterAlignByDeviceDataSet.java | 61 -- .../cluster/query/fill/ClusterFillExecutor.java | 53 +- .../cluster/query/reader/ClusterReaderFactory.java | 12 +- .../query/reader/mult/RemoteMultSeriesReader.java | 3 +- .../cluster/server/heartbeat/HeartbeatThread.java | 50 +- .../cluster/server/member/DataGroupMember.java | 113 ++- .../cluster/server/member/MetaGroupMember.java | 2 +- .../iotdb/cluster/server/member/RaftMember.java | 2 +- .../cluster/log/applier/DataLogApplierTest.java | 39 +- .../query/{ => fill}/ClusterFillExecutorTest.java | 69 +- .../server/heartbeat/HeartbeatThreadTest.java | 5 +- .../iotdb/cluster/server/member/BaseMember.java | 14 +- .../cluster/server/member/MetaGroupMemberTest.java | 6 +- compile-tools/pom.xml | 24 + cross-tests/pom.xml | 9 + .../tests/tools/importCsv/AbstractScript.java | 54 +- .../tests/tools/importCsv/ExportCsvTestIT.java | 209 ++---- .../tests/tools/importCsv/ImportCsvTestIT.java | 383 ++++++---- distribution/pom.xml | 2 +- docker/src/main/Dockerfile-0.12.2-cluster | 53 ++ docker/src/main/Dockerfile-0.12.2-node | 45 ++ docs/Download/README.md | 34 +- docs/UserGuide/Cluster/Cluster-Setup-Example.md | 2 +- docs/UserGuide/Cluster/Cluster-Setup.md | 21 +- docs/UserGuide/Data-Concept/Data-Type.md | 14 +- docs/UserGuide/Data-Concept/Encoding.md | 2 +- .../DDL-Data-Definition-Language.md | 2 +- docs/UserGuide/System-Tools/CSV-Tool.md | 204 ++++-- .../UserGuide/System-Tools/Load-External-Tsfile.md | 10 +- docs/zh/Download/README.md | 34 +- docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md | 2 +- docs/zh/UserGuide/Cluster/Cluster-Setup.md | 22 +- docs/zh/UserGuide/Data-Concept/Data-Type.md | 14 +- docs/zh/UserGuide/Data-Concept/Encoding.md | 4 +- .../DDL-Data-Definition-Language.md | 2 +- docs/zh/UserGuide/System-Tools/CSV-Tool.md | 205 ++++-- .../UserGuide/System-Tools/Load-External-Tsfile.md | 10 +- example/client-cpp-example/pom.xml | 19 +- example/client-cpp-example/src/SessionExample.cpp | 15 +- .../iotdb/AlignedTimeseriesSessionExample.java | 6 +- .../iotdb/HybridTimeseriesSessionExample.java | 5 +- .../iotdb/tsfile/TsFileWriteVectorWithTablet.java | 2 +- .../apache/iotdb/jdbc/IoTDBPreparedStatement.java | 4 +- pom.xml | 4 +- server/src/assembly/resources/conf/iotdb-env.sh | 4 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../org/apache/iotdb/db/conf/OperationType.java | 65 ++ .../org/apache/iotdb/db/engine/StorageEngine.java | 23 +- .../db/engine/cache/TimeSeriesMetadataCache.java | 121 ++-- .../iotdb/db/engine/memtable/AbstractMemTable.java | 21 +- .../db/engine/memtable/PrimitiveMemTable.java | 3 +- .../db/engine/merge/manage/MergeResource.java | 4 +- .../db/engine/querycontext/ReadOnlyMemChunk.java | 19 +- .../engine/storagegroup/StorageGroupProcessor.java | 187 ++--- .../db/engine/storagegroup/TsFileProcessor.java | 21 +- .../virtualSg/VirtualStorageGroupManager.java | 9 +- .../org/apache/iotdb/db/metadata/MManager.java | 356 ++++++---- .../java/org/apache/iotdb/db/metadata/MTree.java | 110 +-- .../org/apache/iotdb/db/metadata/PartialPath.java | 21 +- .../iotdb/db/metadata/VectorPartialPath.java | 68 +- .../db/metadata/lastCache/LastCacheManager.java | 331 +++++++++ .../container/ILastCacheContainer.java} | 56 +- .../lastCache/container/LastCacheContainer.java | 118 +++ .../lastCache/container/value/ILastCacheValue.java | 36 +- .../container/value/UnaryLastCacheValue.java | 106 +++ .../container/value/VectorLastCacheValue.java | 86 +++ .../iotdb/db/metadata/mnode/EntityMNode.java | 25 + .../iotdb/db/metadata/mnode/IEntityMNode.java | 6 + .../iotdb/db/metadata/mnode/IMeasurementMNode.java | 9 +- .../org/apache/iotdb/db/metadata/mnode/MNode.java | 2 +- .../iotdb/db/metadata/mnode/MeasurementMNode.java | 51 +- .../apache/iotdb/db/metadata/tag/TagManager.java | 8 +- .../iotdb/db/metadata/template/Template.java | 2 +- .../apache/iotdb/db/qp/constant/SQLConstant.java | 4 +- .../apache/iotdb/db/qp/executor/IPlanExecutor.java | 9 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 50 +- .../org/apache/iotdb/db/qp/logical/Operator.java | 2 +- .../iotdb/db/qp/logical/crud/LikeOperator.java | 4 + .../iotdb/db/qp/logical/crud/QueryOperator.java | 16 +- .../iotdb/db/qp/logical/crud/RegexpOperator.java | 4 + .../db/qp/logical/sys/CreateFunctionOperator.java | 11 +- .../db/qp/logical/sys/ShowFunctionsOperator.java | 12 +- ...veFileOperator.java => UnloadFileOperator.java} | 10 +- .../org/apache/iotdb/db/qp/physical/BatchPlan.java | 9 + .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 4 + .../iotdb/db/qp/physical/crud/DeletePlan.java | 15 + .../db/qp/physical/crud/InsertMultiTabletPlan.java | 19 +- .../iotdb/db/qp/physical/crud/InsertRowPlan.java | 11 +- .../physical/crud/InsertRowsOfOneDevicePlan.java | 89 ++- .../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 15 +- .../iotdb/db/qp/physical/crud/LastQueryPlan.java | 2 - .../iotdb/db/qp/physical/crud/QueryPlan.java | 2 +- .../db/qp/physical/crud/RawDataQueryPlan.java | 30 +- .../db/qp/physical/sys/CreateFunctionPlan.java | 18 +- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 12 +- .../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 15 + .../db/qp/physical/sys/ShowFunctionsPlan.java | 9 +- .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 14 +- .../qp/strategy/optimizer/ConcatPathOptimizer.java | 14 + .../iotdb/db/query/control/QueryTimeManager.java | 25 +- .../db/query/dataset/AlignByDeviceDataSet.java | 54 +- .../dataset/RawQueryDataSetWithoutValueFilter.java | 6 +- .../iotdb/db/query/executor/FillQueryExecutor.java | 94 ++- .../iotdb/db/query/executor/LastQueryExecutor.java | 34 +- .../iotdb/db/query/executor/QueryRouter.java | 28 +- .../db/query/executor/fill/LastPointReader.java | 13 +- .../query/expression/unary/TimeSeriesOperand.java | 2 +- .../query/timegenerator/ServerTimeGenerator.java | 7 - .../udf/service/UDFRegistrationInformation.java | 13 +- .../query/udf/service/UDFRegistrationService.java | 58 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 146 +++- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 34 +- .../java/org/apache/iotdb/db/utils/MemUtils.java | 36 +- .../iotdb/db/writelog/recover/LogReplayer.java | 4 +- .../engine/modification/DeletionFileNodeTest.java | 109 ++- .../db/engine/modification/DeletionQueryTest.java | 77 +- .../storagegroup/StorageGroupProcessorTest.java | 2 +- .../iotdb/db/engine/storagegroup/TTLTest.java | 4 +- .../apache/iotdb/db/integration/IoTDBFillIT.java | 34 +- .../{IoTDBLikeIT.java => IoTDBFuzzyQueryIT.java} | 125 +++- .../apache/iotdb/db/integration/IoTDBLastIT.java | 28 +- .../db/integration/IoTDBLoadExternalTsfileIT.java | 26 +- .../iotdb/db/integration/IoTDBUDFManagementIT.java | 104 +-- .../aggregation/IoTDBAggregationSmallDataIT.java | 17 +- .../iotdb/db/metadata/MManagerAdvancedTest.java | 16 +- .../qp/physical/InsertRowsOfOneDevicePlanTest.java | 75 ++ .../iotdb/db/qp/physical/PhysicalPlanTest.java | 44 +- .../java/org/apache/iotdb/session/Session.java | 21 +- .../org/apache/iotdb/session/SessionUtils.java | 4 +- .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 286 +++++++- .../apache/iotdb/session/IoTDBSessionVectorIT.java | 5 +- .../.vuepress/public/img/contributor-avatar/cw.jpg | Bin 163226 -> 163225 bytes .../test/java/org/apache/iotdb/db/sql/Cases.java | 304 +++++++- .../java/org/apache/iotdb/db/sql/ClusterIT.java | 9 +- .../tsfile/write/chunk/ChunkGroupWriterImpl.java | 2 +- .../tsfile/write/chunk/VectorChunkWriterImpl.java | 8 +- .../apache/iotdb/tsfile/write/record/Tablet.java | 16 +- .../tsfile/write/schema/IMeasurementSchema.java | 14 +- .../tsfile/write/schema/MeasurementSchema.java | 14 +- .../write/schema/VectorMeasurementSchema.java | 96 +-- .../write/writer/VectorMeasurementSchemaStub.java | 14 +- 182 files changed, 5460 insertions(+), 2604 deletions(-) diff --cc cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java index 3a332ef,38820fc..0629f9b --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java @@@ -37,24 -39,19 +37,24 @@@ public class SyncDataHeartbeatClient ex throws TTransportException { // the difference of the two clients lies in the port super( - protocolFactory.getProtocol( - RpcTransportFactory.INSTANCE.getTransport( - new TSocket( - TConfigurationConst.defaultTConfiguration, - node.getInternalIp(), - node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET, - RaftServer.getHeartbeatClientConnTimeoutMs())))); - this.node = node; - this.pool = pool; - getInputProtocol().getTransport().open(); + protocolFactory, + target.getInternalIp(), + target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET, - ClusterConstant.getConnectionTimeoutInMS(), ++ ClusterConstant.getHeartbeatClientConnTimeoutMs(), + target, + pool); } - public static class FactorySync implements SyncClientFactory { + @Override + public String toString() { + return String.format( + "SyncDataHBClient (ip = %s, port = %d, id = %d)", + target.getInternalIp(), + target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET, + target.getNodeIdentifier()); + } + + public static class Factory implements SyncClientFactory { private TProtocolFactory protocolFactory; diff --cc cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java index cbd4363,5972259..4d9514c --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java @@@ -30,31 -33,24 +30,30 @@@ import org.apache.thrift.transport.TTra * Notice: Because a client will be returned to a pool immediately after a successful request, you * should not cache it anywhere else or there may be conflicts. */ -public class SyncMetaHeartbeatClient extends SyncMetaClient { +public class SyncMetaHeartbeatClient extends TSMetaServiceClient { - private SyncMetaHeartbeatClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool) + private SyncMetaHeartbeatClient( + TProtocolFactory protocolFactory, Node target, SyncClientPool pool) throws TTransportException { - // the difference of the two clients lies in the port super( - protocolFactory.getProtocol( - RpcTransportFactory.INSTANCE.getTransport( - new TSocket( - TConfigurationConst.defaultTConfiguration, - node.getInternalIp(), - node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET, - RaftServer.getHeartbeatClientConnTimeoutMs())))); - this.node = node; - this.pool = pool; - getInputProtocol().getTransport().open(); + protocolFactory, + target.getInternalIp(), + target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET, - ClusterConstant.getConnectionTimeoutInMS(), ++ ClusterConstant.getHeartbeatClientConnTimeoutMs(), + target, + pool); } - public static class FactorySync implements SyncClientFactory { + @Override + public String toString() { + return String.format( + "SyncMetaHBClient (ip = %s, port = %d, id = %d)", + target.getInternalIp(), + target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET, + target.getNodeIdentifier()); + } + + public static class Factory implements SyncClientFactory { private TProtocolFactory protocolFactory; diff --cc cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java index 5efb1c7,8f049c5..c99327a --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java @@@ -24,12 -24,13 +24,21 @@@ import org.apache.iotdb.db.utils.TestOn public class ClusterConstant { /** - * We only change the two values in tests to reduce test time, so they are essentially constant. + * We only change the value in tests to reduce test time, so they are essentially constant. A + * failed election will restart in [0, max(heartbeatInterval, 50ms)). If this range is too small, + * a stale node may frequently issue elections and thus makes the leader step down. */ - private static long electionLeastTimeOutMs = 2 * 1000L; + private static long electionMaxWaitMs = + Math.max(ClusterDescriptor.getInstance().getConfig().getHeartbeatIntervalMs(), 50L); - private static long electionRandomTimeOutMs = 3 * 1000L; ++ // Heartbeat client connection timeout should not be larger than heartbeat interval, otherwise ++ // the thread pool of sending heartbeats or requesting votes may be used up by waiting for ++ // establishing connection with some slow or dead nodes. ++ private static final int heartbeatClientConnTimeoutMs = ++ Math.min( ++ (int) ClusterConstant.getHeartbeatIntervalMs(), ++ ClusterConstant.getConnectionTimeoutInMS()); + public static final int SLOT_NUM = 10000; public static final int HASH_SALT = 2333; public static final int CHECK_ALIVE_TIME_OUT_MS = 1000; @@@ -76,58 -52,7 +72,73 @@@ } @TestOnly - public static void setElectionRandomTimeOutMs(long electionRandomTimeOutMs) { - ClusterConstant.electionRandomTimeOutMs = electionRandomTimeOutMs; + public static void setElectionMaxWaitMs(long electionMaxWaitMs) { + ClusterConstant.electionMaxWaitMs = electionMaxWaitMs; } + + private static int connectionTimeoutInMS = + ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS(); + private static int readOperationTimeoutMS = + ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS(); + private static int writeOperationTimeoutMS = + ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(); + private static int syncLeaderMaxWaitMs = 20 * 1000; - private static long heartBeatIntervalMs = 1000L; ++ private static long heartbeatIntervalMs = ++ ClusterDescriptor.getInstance().getConfig().getHeartbeatIntervalMs(); ++ private static long electionTimeoutMs = ++ ClusterDescriptor.getInstance().getConfig().getElectionTimeoutMs(); + + public static int getConnectionTimeoutInMS() { + return connectionTimeoutInMS; + } + + public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) { + ClusterConstant.connectionTimeoutInMS = connectionTimeoutInMS; + } + + public static int getReadOperationTimeoutMS() { + return readOperationTimeoutMS; + } + + public static int getWriteOperationTimeoutMS() { + return writeOperationTimeoutMS; + } + + public static int getSyncLeaderMaxWaitMs() { + return syncLeaderMaxWaitMs; + } + + public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) { + ClusterConstant.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs; + } + - public static long getHeartBeatIntervalMs() { - return heartBeatIntervalMs; ++ public static long getHeartbeatIntervalMs() { ++ return heartbeatIntervalMs; ++ } ++ ++ public static void setHeartbeatIntervalMs(long heartBeatIntervalMs) { ++ ClusterConstant.heartbeatIntervalMs = heartBeatIntervalMs; ++ } ++ ++ public static long getElectionTimeoutMs() { ++ return electionTimeoutMs; ++ } ++ ++ public static void setElectionTimeoutMs(long electionTimeoutMs) { ++ ClusterConstant.electionTimeoutMs = electionTimeoutMs; + } + - public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) { - ClusterConstant.heartBeatIntervalMs = heartBeatIntervalMs; ++ public static int getHeartbeatClientConnTimeoutMs() { ++ return heartbeatClientConnTimeoutMs; + } + + @TestOnly + public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) { + ClusterConstant.readOperationTimeoutMS = readOperationTimeoutMS; + } + + @TestOnly + public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) { + ClusterConstant.writeOperationTimeoutMS = writeOperationTimeoutMS; + } } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java index f22a731,e80fc1e..b43ee85 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java @@@ -20,10 -20,9 +20,9 @@@ package org.apache.iotdb.cluster.query.reader.mult; import org.apache.iotdb.cluster.client.sync.SyncDataClient; +import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.config.ClusterDescriptor; -import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; - import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java index eb516b3,bc5e8d0..a455290 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java @@@ -86,15 -84,17 +84,18 @@@ public class HeartbeatThread implement // send heartbeats to the followers sendHeartbeats(); synchronized (localMember.getHeartBeatWaitObject()) { - localMember.getHeartBeatWaitObject().wait(ClusterConstant.getHeartBeatIntervalMs()); - localMember.getHeartBeatWaitObject().wait(RaftServer.getHeartbeatIntervalMs()); ++ localMember.getHeartBeatWaitObject().wait(ClusterConstant.getHeartbeatIntervalMs()); } hasHadLeader = true; break; case FOLLOWER: // check if heartbeat times out - long heartBeatInterval = + long heartbeatInterval = System.currentTimeMillis() - localMember.getLastHeartbeatReceivedTime(); - if (heartBeatInterval >= ClusterConstant.getConnectionTimeoutInMS()) { ++ + long randomElectionTimeout = - RaftServer.getElectionTimeoutMs() + getElectionRandomWaitMs(); ++ ClusterConstant.getElectionTimeoutMs() + getElectionRandomWaitMs(); + if (heartbeatInterval >= randomElectionTimeout) { // the leader is considered dead, an election will be started in the next loop logger.info("{}: The leader {} timed out", memberName, localMember.getLeader()); localMember.setCharacter(NodeCharacter.ELECTOR); @@@ -231,19 -234,16 +235,17 @@@ HeartBeatResponse heartBeatResponse = client.sendHeartbeat(req); heartbeatHandler.onComplete(heartBeatResponse); } catch (TTransportException e) { - logger.warn( - memberName - + ": Cannot send heartbeat to node " - + node.toString() - + " due to network", - e); + if (ClusterIoTDB.printClientConnectionErrorStack) { + logger.warn( - "{}: Cannot send heart beat to node {} due to network", - memberName, - node, - e); ++ "{}: Cannot send heartbeat to node {} due to network", memberName, node, e); + } else { + logger.warn( - "{}: Cannot send heart beat to node {} due to network", memberName, node); ++ "{}: Cannot send heartbeat to node {} due to network", memberName, node); + } client.getInputProtocol().getTransport().close(); } catch (Exception e) { - logger.warn("{}: Cannot send heart beat to node {}", memberName, node, e); + logger.warn( + memberName + ": Cannot send heart beat to node " + node.toString(), e); } finally { ClientUtils.putBackSyncHeartbeatClient(client); } @@@ -331,8 -329,8 +331,8 @@@ logger.info( "{}: Wait for {}ms until election time out", memberName, - ClusterConstant.getConnectionTimeoutInMS()); - localMember.getTerm().wait(ClusterConstant.getConnectionTimeoutInMS()); - RaftServer.getElectionTimeoutMs()); - localMember.getTerm().wait(RaftServer.getElectionTimeoutMs()); ++ ClusterConstant.getElectionTimeoutMs()); ++ localMember.getTerm().wait(ClusterConstant.getElectionTimeoutMs()); } catch (InterruptedException e) { logger.info( "{}: Unexpected interruption when waiting the result of election {}", diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index a927bb9,535a86c..dbad251 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@@ -197,35 -179,33 +198,36 @@@ public class DataGroupMember extends Ra lastAppliedPartitionTableVersion = new LastAppliedPatitionTableVersion(getMemberDir()); } - DataGroupMember( - TProtocolFactory factory, - PartitionGroup nodes, - Node thisNode, - MetaGroupMember metaGroupMember) { + DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) { + // The name is used in JMX, so we have to avoid to use "(" "," "=" ")" super( - "Data(" + "Data-" + nodes.getHeader().getNode().getInternalIp() - + ":" - + nodes.getHeader().getNode().getMetaPort() - + ", raftId=" + + "-" + + nodes.getHeader().getNode().getDataPort() + + "-raftId-" + nodes.getId() - + ")", - new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)), - new SyncClientPool(new SyncDataClient.FactorySync(factory)), - new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)), - new SyncClientPool(new SyncDataHeartbeatClient.FactorySync(factory)), + + "", + new AsyncClientPool(new AsyncDataClient.Factory(factory)), + new SyncClientPool(new SyncDataClient.Factory(factory)), + new AsyncClientPool(new AsyncDataHeartbeatClient.Factory(factory)), + new SyncClientPool(new SyncDataHeartbeatClient.Factory(factory)), new AsyncClientPool(new SingleManagerFactory(factory))); - this.thisNode = thisNode; this.metaGroupMember = metaGroupMember; allNodes = nodes; + mbeanName = + String.format( + "%s:%s=%s%d", + "org.apache.iotdb.cluster.service", + IoTDBConstant.JMX_TYPE, + "DataMember", + getRaftGroupId()); setQueryManager(new ClusterQueryManager()); slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), getName()); - LogApplier applier = new DataLogApplier(metaGroupMember, this); - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) { - applier = new AsyncDataLogApplier(applier, name); + dataLogApplier = new DataLogApplier(metaGroupMember, this); + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier() + && ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 1) { + dataLogApplier = new AsyncDataLogApplier(dataLogApplier, name); } logManager = new FilePartitionedSnapshotLogManager( diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 04010a1,3da9dd8..5d71544 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@@ -1812,7 -1807,7 +1812,7 @@@ public class MetaGroupMember extends Ra new Thread( () -> { try { - Thread.sleep(ClusterConstant.getHeartBeatIntervalMs()); - Thread.sleep(RaftServer.getHeartbeatIntervalMs()); ++ Thread.sleep(ClusterConstant.getHeartbeatIntervalMs()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // ignore diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 1f448d5,774e269..76a08b8 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@@ -986,7 -981,7 +986,7 @@@ public abstract class RaftMember implem // the node may have some inconsistent logs with the leader waitedTime = System.currentTimeMillis() - startTime; synchronized (syncLock) { - syncLock.wait(ClusterConstant.getHeartBeatIntervalMs()); - syncLock.wait(RaftServer.getHeartbeatIntervalMs()); ++ syncLock.wait(ClusterConstant.getHeartbeatIntervalMs()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --cc cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java index 45881f0,0831b51..4fe7f29 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java @@@ -101,7 -101,7 +103,8 @@@ public class DataLogApplierTest extend private static final Logger logger = LoggerFactory.getLogger(DataLogApplierTest.class); private boolean partialWriteEnabled; + private boolean isPartitionEnabled; + private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() { @Override @@@ -176,77 -177,80 +182,80 @@@ NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember); partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert(); IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false); + isPartitionEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartition(); - IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true); - testMetaGroupMember.setClientProvider( - new DataClientProvider(new Factory()) { - @Override - public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException { - return new AsyncDataClient(null, null, node, null) { + // //TODO fixme : 恢复正常的provider + ClusterIoTDB.getInstance() + .setClientProvider( + new DataClientProvider(new Factory()) { @Override - public void getAllPaths( - RaftNode header, - List<String> path, - boolean withAlias, - AsyncMethodCallback<GetAllPathsResult> resultHandler) { - new Thread( - () -> - new DataAsyncService(testDataGroupMember) - .getAllPaths(header, path, withAlias, resultHandler)) - .start(); - } + public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException { + return new AsyncDataClient(null, null, node, null) { + @Override + public void getAllPaths( + RaftNode header, + List<String> path, + boolean withAlias, + AsyncMethodCallback<GetAllPathsResult> resultHandler) { + new Thread( + () -> + new DataAsyncService(testDataGroupMember) + .getAllPaths(header, path, withAlias, resultHandler)) + .start(); + } - @Override - public void pullTimeSeriesSchema( - PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - new Thread( - () -> { - List<TimeseriesSchema> timeseriesSchemas = new ArrayList<>(); - for (String path : request.prefixPaths) { - if (path.startsWith(TestUtils.getTestSg(4))) { - for (int i = 0; i < 10; i++) { - timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i)); + @Override + public void pullTimeSeriesSchema( + PullSchemaRequest request, + AsyncMethodCallback<PullSchemaResp> resultHandler) { + new Thread( + () -> { + List<TimeseriesSchema> timeseriesSchemas = new ArrayList<>(); + for (String path : request.prefixPaths) { + if (path.startsWith(TestUtils.getTestSg(4))) { + for (int i = 0; i < 10; i++) { + timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i)); + } + } else if (!path.startsWith(TestUtils.getTestSg(5))) { + resultHandler.onError(new StorageGroupNotSetException(path)); + return; + } } - } else if (path.startsWith(TestUtils.getTestSg(1)) - || path.startsWith(TestUtils.getTestSg(2)) - || path.startsWith(TestUtils.getTestSg(3))) { - // do nothing - } else if (!path.startsWith(TestUtils.getTestSg(5))) { - resultHandler.onError(new StorageGroupNotSetException(path)); - return; - } - } - PullSchemaResp resp = new PullSchemaResp(); - // serialize the schemas - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = - new DataOutputStream(byteArrayOutputStream); - try { - dataOutputStream.writeInt(timeseriesSchemas.size()); - for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) { - timeseriesSchema.serializeTo(dataOutputStream); - } - } catch (IOException ignored) { - // unreachable for we are using a ByteArrayOutputStream - } - resp.setSchemaBytes(byteArrayOutputStream.toByteArray()); - resultHandler.onComplete(resp); - }) - .start(); - } + PullSchemaResp resp = new PullSchemaResp(); + // serialize the schemas + ByteArrayOutputStream byteArrayOutputStream = + new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = + new DataOutputStream(byteArrayOutputStream); + try { + dataOutputStream.writeInt(timeseriesSchemas.size()); + for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) { + timeseriesSchema.serializeTo(dataOutputStream); + } + } catch (IOException ignored) { + // unreachable for we are using a ByteArrayOutputStream + } + resp.setSchemaBytes(byteArrayOutputStream.toByteArray()); + resultHandler.onComplete(resp); + }) + .start(); + } - @Override - public void pullMeasurementSchema( - PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - new Thread( - () -> - new DataAsyncService(testDataGroupMember) - .pullMeasurementSchema(request, resultHandler)) - .start(); + @Override + public void pullMeasurementSchema( + PullSchemaRequest request, + AsyncMethodCallback<PullSchemaResp> resultHandler) { + new Thread( + () -> + new DataAsyncService(testDataGroupMember) + .pullMeasurementSchema(request, resultHandler)) + .start(); + } + }; } - }; - } - }); + }); ((CMManager) IoTDB.metaManager).setMetaGroupMember(testMetaGroupMember); + testDataGroupMember.setMetaGroupMember(testMetaGroupMember); + applier = new DataLogApplier(testMetaGroupMember, testDataGroupMember); } @Override diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java index 13076a5,23c78b6..512e67b --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java @@@ -147,8 -148,9 +147,9 @@@ public class HeartbeatThreadTest @Before public void setUp() throws Exception { - ClusterConstant.setElectionLeastTimeOutMs(20); - ClusterConstant.setElectionRandomTimeOutMs(30); + ClusterConstant.setElectionMaxWaitMs(50L); - RaftServer.setHeartbeatIntervalMs(100L); - RaftServer.setElectionTimeoutMs(1000L); ++ ClusterConstant.setHeartbeatIntervalMs(100L); ++ ClusterConstant.setElectionTimeoutMs(1000L); prevUseAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(); ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true); logManager = new TestLogManager(1); diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java index 44c58fb,d8cae9d..aa3e0f6 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java @@@ -111,11 -111,13 +112,17 @@@ public class BaseMember IoTDBDescriptor.getInstance().getConfig().setEnableWal(false); RaftMember.setWaitLeaderTimeMs(10); - syncLeaderMaxWait = RaftServer.getSyncLeaderMaxWaitMs(); - heartBeatInterval = RaftServer.getHeartbeatIntervalMs(); - electionTimeout = RaftServer.getElectionTimeoutMs(); + syncLeaderMaxWait = ClusterConstant.getSyncLeaderMaxWaitMs(); - heartBeatInterval = ClusterConstant.getHeartBeatIntervalMs(); ++ heartBeatInterval = ClusterConstant.getHeartbeatIntervalMs(); ++ electionTimeout = ClusterConstant.getElectionTimeoutMs(); - RaftServer.setSyncLeaderMaxWaitMs(100); - RaftServer.setHeartbeatIntervalMs(100); - RaftServer.setElectionTimeoutMs(1000); + ClusterConstant.setSyncLeaderMaxWaitMs(100); - ClusterConstant.setHeartBeatIntervalMs(100); ++ ClusterConstant.setHeartbeatIntervalMs(100); ++ ClusterConstant.setElectionTimeoutMs(1000); ++ ++ electionTimeout = ClusterConstant.getElectionTimeoutMs(); ++ ++ ClusterConstant.setElectionTimeoutMs(1000); allNodes = new PartitionGroup(); for (int i = 0; i < 100; i += 10) { @@@ -192,8 -194,9 +199,9 @@@ ClusterDescriptor.getInstance().getConfig().setUseAsyncApplier(prevUseAsyncApplier); IoTDBDescriptor.getInstance().getConfig().setEnableWal(prevEnableWAL); - RaftServer.setSyncLeaderMaxWaitMs(syncLeaderMaxWait); - RaftServer.setHeartbeatIntervalMs(heartBeatInterval); - RaftServer.setElectionTimeoutMs(electionTimeout); + ClusterConstant.setSyncLeaderMaxWaitMs(syncLeaderMaxWait); - ClusterConstant.setHeartBeatIntervalMs(heartBeatInterval); ++ ClusterConstant.setHeartbeatIntervalMs(heartBeatInterval); ++ ClusterConstant.setElectionTimeoutMs(electionTimeout); } DataGroupMember getDataGroupMember(Node node) { diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 016a722,56f0bd8..33aee68 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@@ -691,8 -696,8 +691,8 @@@ public class MetaGroupMemberTest extend @Test public void testJoinClusterFailed() throws QueryProcessException { System.out.println("Start testJoinClusterFailed()"); - long prevInterval = ClusterConstant.getHeartBeatIntervalMs(); - ClusterConstant.setHeartBeatIntervalMs(10); - long prevInterval = RaftServer.getHeartbeatIntervalMs(); - RaftServer.setHeartbeatIntervalMs(10); ++ long prevInterval = ClusterConstant.getHeartbeatIntervalMs(); ++ ClusterConstant.setHeartbeatIntervalMs(10); ClusterDescriptor.getInstance().getConfig().setJoinClusterTimeOutMs(100); dummyResponse.set(Response.RESPONSE_NO_CONNECTION); MetaGroupMember newMember = getMetaGroupMember(TestUtils.getNode(10)); @@@ -703,7 -708,7 +703,7 @@@ assertTrue(e instanceof StartUpCheckFailureException); } finally { newMember.closeLogManager(); - ClusterConstant.setHeartBeatIntervalMs(prevInterval); - RaftServer.setHeartbeatIntervalMs(prevInterval); ++ ClusterConstant.setHeartbeatIntervalMs(prevInterval); } }
