This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 06cbe01bb9d42c47fbf0dc655218dcc3dd5dc8c8
Merge: 9fdbbad 6dc45c6
Author: xiangdong huang <[email protected]>
AuthorDate: Thu Sep 23 19:43:01 2021 +0800

    merge with master

 .github/workflows/main-unix.yml                    |   2 +-
 .github/workflows/main-win.yml                     |   2 +-
 README.md                                          |   2 +-
 README_ZH.md                                       |   2 +-
 RELEASE_NOTES.md                                   |   2 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |  20 +-
 checkstyle.xml                                     |   2 +-
 cli/pom.xml                                        |   5 +
 .../java/org/apache/iotdb/cli/AbstractCli.java     |  17 +-
 cli/src/main/java/org/apache/iotdb/cli/Cli.java    |   3 +-
 cli/src/main/java/org/apache/iotdb/cli/WinCli.java |  13 +-
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |  96 ++-
 .../main/java/org/apache/iotdb/tool/ExportCsv.java | 240 +++----
 .../main/java/org/apache/iotdb/tool/ImportCsv.java | 789 +++++++++++++--------
 .../java/org/apache/iotdb/cli/AbstractScript.java  |  10 +-
 .../org/apache/iotdb/cli/StartClientScriptIT.java  |  27 +
 .../tool/{ => integration}/ExportCsvTestIT.java    |   2 +-
 .../tool/{ => integration}/ImportCsvTestIT.java    |   2 +-
 .../apache/iotdb/tool/unit/WriteCsvFileTestUT.java |  46 ++
 client-cpp/pom.xml                                 |  18 +
 client-cpp/src/main/Session.cpp                    |  12 +-
 client-cpp/src/main/Session.h                      |   4 +-
 .../resources/conf/iotdb-cluster.properties        |  13 +-
 .../client/sync/SyncDataHeartbeatClient.java       |   2 +-
 .../client/sync/SyncMetaHeartbeatClient.java       |   3 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |  20 +
 .../iotdb/cluster/config/ClusterConstant.java      |  61 +-
 .../iotdb/cluster/config/ClusterDescriptor.java    |  10 +
 .../iotdb/cluster/log/applier/BaseApplier.java     |  94 ++-
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  66 +-
 .../manage/FilePartitionedSnapshotLogManager.java  |  12 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |   5 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 115 +--
 .../serializable/SyncLogDequeSerializer.java       |   9 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  79 +--
 .../partition/slot/SlotTimePartitionFilter.java    |  55 ++
 .../cluster/query/ClusterDataQueryExecutor.java    |   2 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  23 +-
 .../iotdb/cluster/query/ClusterPlanRouter.java     |  36 +
 .../iotdb/cluster/query/ClusterQueryRouter.java    |  14 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |   6 +-
 .../query/dataset/ClusterAlignByDeviceDataSet.java |  61 --
 .../cluster/query/fill/ClusterFillExecutor.java    |  53 +-
 .../cluster/query/reader/ClusterReaderFactory.java |  12 +-
 .../query/reader/mult/RemoteMultSeriesReader.java  |   3 +-
 .../cluster/server/heartbeat/HeartbeatThread.java  |  50 +-
 .../cluster/server/member/DataGroupMember.java     | 113 ++-
 .../cluster/server/member/MetaGroupMember.java     |   2 +-
 .../iotdb/cluster/server/member/RaftMember.java    |   2 +-
 .../cluster/log/applier/DataLogApplierTest.java    |  39 +-
 .../query/{ => fill}/ClusterFillExecutorTest.java  |  69 +-
 .../server/heartbeat/HeartbeatThreadTest.java      |   5 +-
 .../iotdb/cluster/server/member/BaseMember.java    |  14 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   6 +-
 compile-tools/pom.xml                              |  24 +
 cross-tests/pom.xml                                |   9 +
 .../tests/tools/importCsv/AbstractScript.java      |  54 +-
 .../tests/tools/importCsv/ExportCsvTestIT.java     | 209 ++----
 .../tests/tools/importCsv/ImportCsvTestIT.java     | 383 ++++++----
 distribution/pom.xml                               |   2 +-
 docker/src/main/Dockerfile-0.12.2-cluster          |  53 ++
 docker/src/main/Dockerfile-0.12.2-node             |  45 ++
 docs/Download/README.md                            |  34 +-
 docs/UserGuide/Cluster/Cluster-Setup-Example.md    |   2 +-
 docs/UserGuide/Cluster/Cluster-Setup.md            |  21 +-
 docs/UserGuide/Data-Concept/Data-Type.md           |  14 +-
 docs/UserGuide/Data-Concept/Encoding.md            |   2 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 docs/UserGuide/System-Tools/CSV-Tool.md            | 204 ++++--
 .../UserGuide/System-Tools/Load-External-Tsfile.md |  10 +-
 docs/zh/Download/README.md                         |  34 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md |   2 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  22 +-
 docs/zh/UserGuide/Data-Concept/Data-Type.md        |  14 +-
 docs/zh/UserGuide/Data-Concept/Encoding.md         |   4 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 docs/zh/UserGuide/System-Tools/CSV-Tool.md         | 205 ++++--
 .../UserGuide/System-Tools/Load-External-Tsfile.md |  10 +-
 example/client-cpp-example/pom.xml                 |  19 +-
 example/client-cpp-example/src/SessionExample.cpp  |  15 +-
 .../iotdb/AlignedTimeseriesSessionExample.java     |   6 +-
 .../iotdb/HybridTimeseriesSessionExample.java      |   5 +-
 .../iotdb/tsfile/TsFileWriteVectorWithTablet.java  |   2 +-
 .../apache/iotdb/jdbc/IoTDBPreparedStatement.java  |   4 +-
 pom.xml                                            |   4 +-
 server/src/assembly/resources/conf/iotdb-env.sh    |   4 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../org/apache/iotdb/db/conf/OperationType.java    |  65 ++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  23 +-
 .../db/engine/cache/TimeSeriesMetadataCache.java   | 121 ++--
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  21 +-
 .../db/engine/memtable/PrimitiveMemTable.java      |   3 +-
 .../db/engine/merge/manage/MergeResource.java      |   4 +-
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |  19 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 187 ++---
 .../db/engine/storagegroup/TsFileProcessor.java    |  21 +-
 .../virtualSg/VirtualStorageGroupManager.java      |   9 +-
 .../org/apache/iotdb/db/metadata/MManager.java     | 356 ++++++----
 .../java/org/apache/iotdb/db/metadata/MTree.java   | 110 +--
 .../org/apache/iotdb/db/metadata/PartialPath.java  |  21 +-
 .../iotdb/db/metadata/VectorPartialPath.java       |  68 +-
 .../db/metadata/lastCache/LastCacheManager.java    | 331 +++++++++
 .../container/ILastCacheContainer.java}            |  56 +-
 .../lastCache/container/LastCacheContainer.java    | 118 +++
 .../lastCache/container/value/ILastCacheValue.java |  36 +-
 .../container/value/UnaryLastCacheValue.java       | 106 +++
 .../container/value/VectorLastCacheValue.java      |  86 +++
 .../iotdb/db/metadata/mnode/EntityMNode.java       |  25 +
 .../iotdb/db/metadata/mnode/IEntityMNode.java      |   6 +
 .../iotdb/db/metadata/mnode/IMeasurementMNode.java |   9 +-
 .../org/apache/iotdb/db/metadata/mnode/MNode.java  |   2 +-
 .../iotdb/db/metadata/mnode/MeasurementMNode.java  |  51 +-
 .../apache/iotdb/db/metadata/tag/TagManager.java   |   8 +-
 .../iotdb/db/metadata/template/Template.java       |   2 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   4 +-
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |   9 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  50 +-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 +-
 .../iotdb/db/qp/logical/crud/LikeOperator.java     |   4 +
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |  16 +-
 .../iotdb/db/qp/logical/crud/RegexpOperator.java   |   4 +
 .../db/qp/logical/sys/CreateFunctionOperator.java  |  11 +-
 .../db/qp/logical/sys/ShowFunctionsOperator.java   |  12 +-
 ...veFileOperator.java => UnloadFileOperator.java} |  10 +-
 .../org/apache/iotdb/db/qp/physical/BatchPlan.java |   9 +
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   4 +
 .../iotdb/db/qp/physical/crud/DeletePlan.java      |  15 +
 .../db/qp/physical/crud/InsertMultiTabletPlan.java |  19 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  11 +-
 .../physical/crud/InsertRowsOfOneDevicePlan.java   |  89 ++-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |  15 +-
 .../iotdb/db/qp/physical/crud/LastQueryPlan.java   |   2 -
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   2 +-
 .../db/qp/physical/crud/RawDataQueryPlan.java      |  30 +-
 .../db/qp/physical/sys/CreateFunctionPlan.java     |  18 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  12 +-
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |  15 +
 .../db/qp/physical/sys/ShowFunctionsPlan.java      |   9 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  14 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |  14 +
 .../iotdb/db/query/control/QueryTimeManager.java   |  25 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |  54 +-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |   6 +-
 .../iotdb/db/query/executor/FillQueryExecutor.java |  94 ++-
 .../iotdb/db/query/executor/LastQueryExecutor.java |  34 +-
 .../iotdb/db/query/executor/QueryRouter.java       |  28 +-
 .../db/query/executor/fill/LastPointReader.java    |  13 +-
 .../query/expression/unary/TimeSeriesOperand.java  |   2 +-
 .../query/timegenerator/ServerTimeGenerator.java   |   7 -
 .../udf/service/UDFRegistrationInformation.java    |  13 +-
 .../query/udf/service/UDFRegistrationService.java  |  58 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 146 +++-
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  34 +-
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |  36 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |   4 +-
 .../engine/modification/DeletionFileNodeTest.java  | 109 ++-
 .../db/engine/modification/DeletionQueryTest.java  |  77 +-
 .../storagegroup/StorageGroupProcessorTest.java    |   2 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |   4 +-
 .../apache/iotdb/db/integration/IoTDBFillIT.java   |  34 +-
 .../{IoTDBLikeIT.java => IoTDBFuzzyQueryIT.java}   | 125 +++-
 .../apache/iotdb/db/integration/IoTDBLastIT.java   |  28 +-
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |  26 +-
 .../iotdb/db/integration/IoTDBUDFManagementIT.java | 104 +--
 .../aggregation/IoTDBAggregationSmallDataIT.java   |  17 +-
 .../iotdb/db/metadata/MManagerAdvancedTest.java    |  16 +-
 .../qp/physical/InsertRowsOfOneDevicePlanTest.java |  75 ++
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |  44 +-
 .../java/org/apache/iotdb/session/Session.java     |  21 +-
 .../org/apache/iotdb/session/SessionUtils.java     |   4 +-
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 286 +++++++-
 .../apache/iotdb/session/IoTDBSessionVectorIT.java |   5 +-
 .../.vuepress/public/img/contributor-avatar/cw.jpg | Bin 163226 -> 163225 bytes
 .../test/java/org/apache/iotdb/db/sql/Cases.java   | 304 +++++++-
 .../java/org/apache/iotdb/db/sql/ClusterIT.java    |   9 +-
 .../tsfile/write/chunk/ChunkGroupWriterImpl.java   |   2 +-
 .../tsfile/write/chunk/VectorChunkWriterImpl.java  |   8 +-
 .../apache/iotdb/tsfile/write/record/Tablet.java   |  16 +-
 .../tsfile/write/schema/IMeasurementSchema.java    |  14 +-
 .../tsfile/write/schema/MeasurementSchema.java     |  14 +-
 .../write/schema/VectorMeasurementSchema.java      |  96 +--
 .../write/writer/VectorMeasurementSchemaStub.java  |  14 +-
 182 files changed, 5460 insertions(+), 2604 deletions(-)

diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index 3a332ef,38820fc..0629f9b
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@@ -37,24 -39,19 +37,24 @@@ public class SyncDataHeartbeatClient ex
        throws TTransportException {
      // the difference of the two clients lies in the port
      super(
 -        protocolFactory.getProtocol(
 -            RpcTransportFactory.INSTANCE.getTransport(
 -                new TSocket(
 -                    TConfigurationConst.defaultTConfiguration,
 -                    node.getInternalIp(),
 -                    node.getDataPort() + 
ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
 -                    RaftServer.getHeartbeatClientConnTimeoutMs()))));
 -    this.node = node;
 -    this.pool = pool;
 -    getInputProtocol().getTransport().open();
 +        protocolFactory,
 +        target.getInternalIp(),
 +        target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
-         ClusterConstant.getConnectionTimeoutInMS(),
++        ClusterConstant.getHeartbeatClientConnTimeoutMs(),
 +        target,
 +        pool);
    }
  
 -  public static class FactorySync implements SyncClientFactory {
 +  @Override
 +  public String toString() {
 +    return String.format(
 +        "SyncDataHBClient (ip = %s, port = %d, id = %d)",
 +        target.getInternalIp(),
 +        target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
 +        target.getNodeIdentifier());
 +  }
 +
 +  public static class Factory implements SyncClientFactory {
  
      private TProtocolFactory protocolFactory;
  
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index cbd4363,5972259..4d9514c
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@@ -30,31 -33,24 +30,30 @@@ import org.apache.thrift.transport.TTra
   * Notice: Because a client will be returned to a pool immediately after a 
successful request, you
   * should not cache it anywhere else or there may be conflicts.
   */
 -public class SyncMetaHeartbeatClient extends SyncMetaClient {
 +public class SyncMetaHeartbeatClient extends TSMetaServiceClient {
  
 -  private SyncMetaHeartbeatClient(TProtocolFactory protocolFactory, Node 
node, SyncClientPool pool)
 +  private SyncMetaHeartbeatClient(
 +      TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
        throws TTransportException {
-     // the difference of the two clients lies in the port
      super(
 -        protocolFactory.getProtocol(
 -            RpcTransportFactory.INSTANCE.getTransport(
 -                new TSocket(
 -                    TConfigurationConst.defaultTConfiguration,
 -                    node.getInternalIp(),
 -                    node.getMetaPort() + 
ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
 -                    RaftServer.getHeartbeatClientConnTimeoutMs()))));
 -    this.node = node;
 -    this.pool = pool;
 -    getInputProtocol().getTransport().open();
 +        protocolFactory,
 +        target.getInternalIp(),
 +        target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
-         ClusterConstant.getConnectionTimeoutInMS(),
++        ClusterConstant.getHeartbeatClientConnTimeoutMs(),
 +        target,
 +        pool);
    }
  
 -  public static class FactorySync implements SyncClientFactory {
 +  @Override
 +  public String toString() {
 +    return String.format(
 +        "SyncMetaHBClient (ip = %s, port = %d, id = %d)",
 +        target.getInternalIp(),
 +        target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
 +        target.getNodeIdentifier());
 +  }
 +
 +  public static class Factory implements SyncClientFactory {
  
      private TProtocolFactory protocolFactory;
  
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 5efb1c7,8f049c5..c99327a
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@@ -24,12 -24,13 +24,21 @@@ import org.apache.iotdb.db.utils.TestOn
  public class ClusterConstant {
  
    /**
-    * We only change the two values in tests to reduce test time, so they are 
essentially constant.
+    * We only change the value in tests to reduce test time, so they are 
essentially constant. A
+    * failed election will restart in [0, max(heartbeatInterval, 50ms)). If 
this range is too small,
+    * a stale node may frequently issue elections and thus makes the leader 
step down.
     */
-   private static long electionLeastTimeOutMs = 2 * 1000L;
+   private static long electionMaxWaitMs =
+       
Math.max(ClusterDescriptor.getInstance().getConfig().getHeartbeatIntervalMs(), 
50L);
  
-   private static long electionRandomTimeOutMs = 3 * 1000L;
++  // Heartbeat client connection timeout should not be larger than heartbeat 
interval, otherwise
++  // the thread pool of sending heartbeats or requesting votes may be used up 
by waiting for
++  // establishing connection with some slow or dead nodes.
++  private static final int heartbeatClientConnTimeoutMs =
++      Math.min(
++          (int) ClusterConstant.getHeartbeatIntervalMs(),
++          ClusterConstant.getConnectionTimeoutInMS());
 +
    public static final int SLOT_NUM = 10000;
    public static final int HASH_SALT = 2333;
    public static final int CHECK_ALIVE_TIME_OUT_MS = 1000;
@@@ -76,58 -52,7 +72,73 @@@
    }
  
    @TestOnly
-   public static void setElectionRandomTimeOutMs(long electionRandomTimeOutMs) 
{
-     ClusterConstant.electionRandomTimeOutMs = electionRandomTimeOutMs;
+   public static void setElectionMaxWaitMs(long electionMaxWaitMs) {
+     ClusterConstant.electionMaxWaitMs = electionMaxWaitMs;
    }
 +
 +  private static int connectionTimeoutInMS =
 +      ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
 +  private static int readOperationTimeoutMS =
 +      ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
 +  private static int writeOperationTimeoutMS =
 +      
ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
 +  private static int syncLeaderMaxWaitMs = 20 * 1000;
-   private static long heartBeatIntervalMs = 1000L;
++  private static long heartbeatIntervalMs =
++      ClusterDescriptor.getInstance().getConfig().getHeartbeatIntervalMs();
++  private static long electionTimeoutMs =
++      ClusterDescriptor.getInstance().getConfig().getElectionTimeoutMs();
 +
 +  public static int getConnectionTimeoutInMS() {
 +    return connectionTimeoutInMS;
 +  }
 +
 +  public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
 +    ClusterConstant.connectionTimeoutInMS = connectionTimeoutInMS;
 +  }
 +
 +  public static int getReadOperationTimeoutMS() {
 +    return readOperationTimeoutMS;
 +  }
 +
 +  public static int getWriteOperationTimeoutMS() {
 +    return writeOperationTimeoutMS;
 +  }
 +
 +  public static int getSyncLeaderMaxWaitMs() {
 +    return syncLeaderMaxWaitMs;
 +  }
 +
 +  public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) {
 +    ClusterConstant.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs;
 +  }
 +
-   public static long getHeartBeatIntervalMs() {
-     return heartBeatIntervalMs;
++  public static long getHeartbeatIntervalMs() {
++    return heartbeatIntervalMs;
++  }
++
++  public static void setHeartbeatIntervalMs(long heartBeatIntervalMs) {
++    ClusterConstant.heartbeatIntervalMs = heartBeatIntervalMs;
++  }
++
++  public static long getElectionTimeoutMs() {
++    return electionTimeoutMs;
++  }
++
++  public static void setElectionTimeoutMs(long electionTimeoutMs) {
++    ClusterConstant.electionTimeoutMs = electionTimeoutMs;
 +  }
 +
-   public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) {
-     ClusterConstant.heartBeatIntervalMs = heartBeatIntervalMs;
++  public static int getHeartbeatClientConnTimeoutMs() {
++    return heartbeatClientConnTimeoutMs;
 +  }
 +
 +  @TestOnly
 +  public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) {
 +    ClusterConstant.readOperationTimeoutMS = readOperationTimeoutMS;
 +  }
 +
 +  @TestOnly
 +  public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) {
 +    ClusterConstant.writeOperationTimeoutMS = writeOperationTimeoutMS;
 +  }
  }
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index f22a731,e80fc1e..b43ee85
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@@ -20,10 -20,9 +20,9 @@@
  package org.apache.iotdb.cluster.query.reader.mult;
  
  import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 +import org.apache.iotdb.cluster.config.ClusterConstant;
  import org.apache.iotdb.cluster.config.ClusterDescriptor;
 -import org.apache.iotdb.cluster.server.RaftServer;
  import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
- import org.apache.iotdb.db.metadata.PartialPath;
  import org.apache.iotdb.db.utils.SerializeUtils;
  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  import org.apache.iotdb.tsfile.read.TimeValuePair;
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index eb516b3,bc5e8d0..a455290
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@@ -86,15 -84,17 +84,18 @@@ public class HeartbeatThread implement
              // send heartbeats to the followers
              sendHeartbeats();
              synchronized (localMember.getHeartBeatWaitObject()) {
-               
localMember.getHeartBeatWaitObject().wait(ClusterConstant.getHeartBeatIntervalMs());
 -              
localMember.getHeartBeatWaitObject().wait(RaftServer.getHeartbeatIntervalMs());
++              
localMember.getHeartBeatWaitObject().wait(ClusterConstant.getHeartbeatIntervalMs());
              }
              hasHadLeader = true;
              break;
            case FOLLOWER:
              // check if heartbeat times out
-             long heartBeatInterval =
+             long heartbeatInterval =
                  System.currentTimeMillis() - 
localMember.getLastHeartbeatReceivedTime();
-             if (heartBeatInterval >= 
ClusterConstant.getConnectionTimeoutInMS()) {
++
+             long randomElectionTimeout =
 -                RaftServer.getElectionTimeoutMs() + getElectionRandomWaitMs();
++                ClusterConstant.getElectionTimeoutMs() + 
getElectionRandomWaitMs();
+             if (heartbeatInterval >= randomElectionTimeout) {
                // the leader is considered dead, an election will be started 
in the next loop
                logger.info("{}: The leader {} timed out", memberName, 
localMember.getLeader());
                localMember.setCharacter(NodeCharacter.ELECTOR);
@@@ -231,19 -234,16 +235,17 @@@
                    HeartBeatResponse heartBeatResponse = 
client.sendHeartbeat(req);
                    heartbeatHandler.onComplete(heartBeatResponse);
                  } catch (TTransportException e) {
 -                  logger.warn(
 -                      memberName
 -                          + ": Cannot send heartbeat to node "
 -                          + node.toString()
 -                          + " due to network",
 -                      e);
 +                  if (ClusterIoTDB.printClientConnectionErrorStack) {
 +                    logger.warn(
-                         "{}: Cannot send heart beat to node {} due to 
network",
-                         memberName,
-                         node,
-                         e);
++                        "{}: Cannot send heartbeat to node {} due to 
network", memberName, node, e);
 +                  } else {
 +                    logger.warn(
-                         "{}: Cannot send heart beat to node {} due to 
network", memberName, node);
++                        "{}: Cannot send heartbeat to node {} due to 
network", memberName, node);
 +                  }
                    client.getInputProtocol().getTransport().close();
                  } catch (Exception e) {
-                   logger.warn("{}: Cannot send heart beat to node {}", 
memberName, node, e);
+                   logger.warn(
+                       memberName + ": Cannot send heart beat to node " + 
node.toString(), e);
                  } finally {
                    ClientUtils.putBackSyncHeartbeatClient(client);
                  }
@@@ -331,8 -329,8 +331,8 @@@
          logger.info(
              "{}: Wait for {}ms until election time out",
              memberName,
-             ClusterConstant.getConnectionTimeoutInMS());
-         
localMember.getTerm().wait(ClusterConstant.getConnectionTimeoutInMS());
 -            RaftServer.getElectionTimeoutMs());
 -        localMember.getTerm().wait(RaftServer.getElectionTimeoutMs());
++            ClusterConstant.getElectionTimeoutMs());
++        localMember.getTerm().wait(ClusterConstant.getElectionTimeoutMs());
        } catch (InterruptedException e) {
          logger.info(
              "{}: Unexpected interruption when waiting the result of election 
{}",
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index a927bb9,535a86c..dbad251
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@@ -197,35 -179,33 +198,36 @@@ public class DataGroupMember extends Ra
      lastAppliedPartitionTableVersion = new 
LastAppliedPatitionTableVersion(getMemberDir());
    }
  
 -  DataGroupMember(
 -      TProtocolFactory factory,
 -      PartitionGroup nodes,
 -      Node thisNode,
 -      MetaGroupMember metaGroupMember) {
 +  DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, 
MetaGroupMember metaGroupMember) {
 +    // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
      super(
 -        "Data("
 +        "Data-"
              + nodes.getHeader().getNode().getInternalIp()
 -            + ":"
 -            + nodes.getHeader().getNode().getMetaPort()
 -            + ", raftId="
 +            + "-"
 +            + nodes.getHeader().getNode().getDataPort()
 +            + "-raftId-"
              + nodes.getId()
 -            + ")",
 -        new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
 -        new SyncClientPool(new SyncDataClient.FactorySync(factory)),
 -        new AsyncClientPool(new 
AsyncDataHeartbeatClient.FactoryAsync(factory)),
 -        new SyncClientPool(new SyncDataHeartbeatClient.FactorySync(factory)),
 +            + "",
 +        new AsyncClientPool(new AsyncDataClient.Factory(factory)),
 +        new SyncClientPool(new SyncDataClient.Factory(factory)),
 +        new AsyncClientPool(new AsyncDataHeartbeatClient.Factory(factory)),
 +        new SyncClientPool(new SyncDataHeartbeatClient.Factory(factory)),
          new AsyncClientPool(new SingleManagerFactory(factory)));
 -    this.thisNode = thisNode;
      this.metaGroupMember = metaGroupMember;
      allNodes = nodes;
 +    mbeanName =
 +        String.format(
 +            "%s:%s=%s%d",
 +            "org.apache.iotdb.cluster.service",
 +            IoTDBConstant.JMX_TYPE,
 +            "DataMember",
 +            getRaftGroupId());
      setQueryManager(new ClusterQueryManager());
      slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), 
getName());
-     LogApplier applier = new DataLogApplier(metaGroupMember, this);
-     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) {
-       applier = new AsyncDataLogApplier(applier, name);
+     dataLogApplier = new DataLogApplier(metaGroupMember, this);
+     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
+         && ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 
1) {
+       dataLogApplier = new AsyncDataLogApplier(dataLogApplier, name);
      }
      logManager =
          new FilePartitionedSnapshotLogManager(
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 04010a1,3da9dd8..5d71544
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -1812,7 -1807,7 +1812,7 @@@ public class MetaGroupMember extends Ra
          new Thread(
                  () -> {
                    try {
-                     Thread.sleep(ClusterConstant.getHeartBeatIntervalMs());
 -                    Thread.sleep(RaftServer.getHeartbeatIntervalMs());
++                    Thread.sleep(ClusterConstant.getHeartbeatIntervalMs());
                    } catch (InterruptedException e) {
                      Thread.currentThread().interrupt();
                      // ignore
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 1f448d5,774e269..76a08b8
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@@ -986,7 -981,7 +986,7 @@@ public abstract class RaftMember implem
          // the node may have some inconsistent logs with the leader
          waitedTime = System.currentTimeMillis() - startTime;
          synchronized (syncLock) {
-           syncLock.wait(ClusterConstant.getHeartBeatIntervalMs());
 -          syncLock.wait(RaftServer.getHeartbeatIntervalMs());
++          syncLock.wait(ClusterConstant.getHeartbeatIntervalMs());
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 45881f0,0831b51..4fe7f29
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@@ -101,7 -101,7 +103,8 @@@ public class DataLogApplierTest extend
  
    private static final Logger logger = 
LoggerFactory.getLogger(DataLogApplierTest.class);
    private boolean partialWriteEnabled;
+   private boolean isPartitionEnabled;
 +
    private TestMetaGroupMember testMetaGroupMember =
        new TestMetaGroupMember() {
          @Override
@@@ -176,77 -177,80 +182,80 @@@
      NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
      partialWriteEnabled = 
IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
      IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
+     isPartitionEnabled = 
IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
 -    IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
 -    testMetaGroupMember.setClientProvider(
 -        new DataClientProvider(new Factory()) {
 -          @Override
 -          public AsyncDataClient getAsyncDataClient(Node node, int timeout) 
throws IOException {
 -            return new AsyncDataClient(null, null, node, null) {
 +    //    //TODO fixme : 恢复正常的provider
 +    ClusterIoTDB.getInstance()
 +        .setClientProvider(
 +            new DataClientProvider(new Factory()) {
                @Override
 -              public void getAllPaths(
 -                  RaftNode header,
 -                  List<String> path,
 -                  boolean withAlias,
 -                  AsyncMethodCallback<GetAllPathsResult> resultHandler) {
 -                new Thread(
 -                        () ->
 -                            new DataAsyncService(testDataGroupMember)
 -                                .getAllPaths(header, path, withAlias, 
resultHandler))
 -                    .start();
 -              }
 +              public AsyncDataClient getAsyncDataClient(Node node, int 
timeout) throws IOException {
 +                return new AsyncDataClient(null, null, node, null) {
 +                  @Override
 +                  public void getAllPaths(
 +                      RaftNode header,
 +                      List<String> path,
 +                      boolean withAlias,
 +                      AsyncMethodCallback<GetAllPathsResult> resultHandler) {
 +                    new Thread(
 +                            () ->
 +                                new DataAsyncService(testDataGroupMember)
 +                                    .getAllPaths(header, path, withAlias, 
resultHandler))
 +                        .start();
 +                  }
  
 -              @Override
 -              public void pullTimeSeriesSchema(
 -                  PullSchemaRequest request, 
AsyncMethodCallback<PullSchemaResp> resultHandler) {
 -                new Thread(
 -                        () -> {
 -                          List<TimeseriesSchema> timeseriesSchemas = new 
ArrayList<>();
 -                          for (String path : request.prefixPaths) {
 -                            if (path.startsWith(TestUtils.getTestSg(4))) {
 -                              for (int i = 0; i < 10; i++) {
 -                                
timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i));
 +                  @Override
 +                  public void pullTimeSeriesSchema(
 +                      PullSchemaRequest request,
 +                      AsyncMethodCallback<PullSchemaResp> resultHandler) {
 +                    new Thread(
 +                            () -> {
 +                              List<TimeseriesSchema> timeseriesSchemas = new 
ArrayList<>();
 +                              for (String path : request.prefixPaths) {
 +                                if (path.startsWith(TestUtils.getTestSg(4))) {
 +                                  for (int i = 0; i < 10; i++) {
 +                                    
timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i));
 +                                  }
 +                                } else if 
(!path.startsWith(TestUtils.getTestSg(5))) {
 +                                  resultHandler.onError(new 
StorageGroupNotSetException(path));
 +                                  return;
 +                                }
                                }
 -                            } else if (path.startsWith(TestUtils.getTestSg(1))
 -                                || path.startsWith(TestUtils.getTestSg(2))
 -                                || path.startsWith(TestUtils.getTestSg(3))) {
 -                              // do nothing
 -                            } else if 
(!path.startsWith(TestUtils.getTestSg(5))) {
 -                              resultHandler.onError(new 
StorageGroupNotSetException(path));
 -                              return;
 -                            }
 -                          }
 -                          PullSchemaResp resp = new PullSchemaResp();
 -                          // serialize the schemas
 -                          ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
 -                          DataOutputStream dataOutputStream =
 -                              new DataOutputStream(byteArrayOutputStream);
 -                          try {
 -                            
dataOutputStream.writeInt(timeseriesSchemas.size());
 -                            for (TimeseriesSchema timeseriesSchema : 
timeseriesSchemas) {
 -                              timeseriesSchema.serializeTo(dataOutputStream);
 -                            }
 -                          } catch (IOException ignored) {
 -                            // unreachable for we are using a 
ByteArrayOutputStream
 -                          }
 -                          
resp.setSchemaBytes(byteArrayOutputStream.toByteArray());
 -                          resultHandler.onComplete(resp);
 -                        })
 -                    .start();
 -              }
 +                              PullSchemaResp resp = new PullSchemaResp();
 +                              // serialize the schemas
 +                              ByteArrayOutputStream byteArrayOutputStream =
 +                                  new ByteArrayOutputStream();
 +                              DataOutputStream dataOutputStream =
 +                                  new DataOutputStream(byteArrayOutputStream);
 +                              try {
 +                                
dataOutputStream.writeInt(timeseriesSchemas.size());
 +                                for (TimeseriesSchema timeseriesSchema : 
timeseriesSchemas) {
 +                                  
timeseriesSchema.serializeTo(dataOutputStream);
 +                                }
 +                              } catch (IOException ignored) {
 +                                // unreachable for we are using a 
ByteArrayOutputStream
 +                              }
 +                              
resp.setSchemaBytes(byteArrayOutputStream.toByteArray());
 +                              resultHandler.onComplete(resp);
 +                            })
 +                        .start();
 +                  }
  
 -              @Override
 -              public void pullMeasurementSchema(
 -                  PullSchemaRequest request, 
AsyncMethodCallback<PullSchemaResp> resultHandler) {
 -                new Thread(
 -                        () ->
 -                            new DataAsyncService(testDataGroupMember)
 -                                .pullMeasurementSchema(request, 
resultHandler))
 -                    .start();
 +                  @Override
 +                  public void pullMeasurementSchema(
 +                      PullSchemaRequest request,
 +                      AsyncMethodCallback<PullSchemaResp> resultHandler) {
 +                    new Thread(
 +                            () ->
 +                                new DataAsyncService(testDataGroupMember)
 +                                    .pullMeasurementSchema(request, 
resultHandler))
 +                        .start();
 +                  }
 +                };
                }
 -            };
 -          }
 -        });
 +            });
      ((CMManager) IoTDB.metaManager).setMetaGroupMember(testMetaGroupMember);
+     testDataGroupMember.setMetaGroupMember(testMetaGroupMember);
+     applier = new DataLogApplier(testMetaGroupMember, testDataGroupMember);
    }
  
    @Override
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
index 13076a5,23c78b6..512e67b
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
@@@ -147,8 -148,9 +147,9 @@@ public class HeartbeatThreadTest 
  
    @Before
    public void setUp() throws Exception {
-     ClusterConstant.setElectionLeastTimeOutMs(20);
-     ClusterConstant.setElectionRandomTimeOutMs(30);
+     ClusterConstant.setElectionMaxWaitMs(50L);
 -    RaftServer.setHeartbeatIntervalMs(100L);
 -    RaftServer.setElectionTimeoutMs(1000L);
++    ClusterConstant.setHeartbeatIntervalMs(100L);
++    ClusterConstant.setElectionTimeoutMs(1000L);
      prevUseAsyncServer = 
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
      ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
      logManager = new TestLogManager(1);
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
index 44c58fb,d8cae9d..aa3e0f6
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
@@@ -111,11 -111,13 +112,17 @@@ public class BaseMember 
      IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
      RaftMember.setWaitLeaderTimeMs(10);
  
 -    syncLeaderMaxWait = RaftServer.getSyncLeaderMaxWaitMs();
 -    heartBeatInterval = RaftServer.getHeartbeatIntervalMs();
 -    electionTimeout = RaftServer.getElectionTimeoutMs();
 +    syncLeaderMaxWait = ClusterConstant.getSyncLeaderMaxWaitMs();
-     heartBeatInterval = ClusterConstant.getHeartBeatIntervalMs();
++    heartBeatInterval = ClusterConstant.getHeartbeatIntervalMs();
++    electionTimeout = ClusterConstant.getElectionTimeoutMs();
  
 -    RaftServer.setSyncLeaderMaxWaitMs(100);
 -    RaftServer.setHeartbeatIntervalMs(100);
 -    RaftServer.setElectionTimeoutMs(1000);
 +    ClusterConstant.setSyncLeaderMaxWaitMs(100);
-     ClusterConstant.setHeartBeatIntervalMs(100);
++    ClusterConstant.setHeartbeatIntervalMs(100);
++    ClusterConstant.setElectionTimeoutMs(1000);
++
++    electionTimeout = ClusterConstant.getElectionTimeoutMs();
++
++    ClusterConstant.setElectionTimeoutMs(1000);
  
      allNodes = new PartitionGroup();
      for (int i = 0; i < 100; i += 10) {
@@@ -192,8 -194,9 +199,9 @@@
      
ClusterDescriptor.getInstance().getConfig().setUseAsyncApplier(prevUseAsyncApplier);
      IoTDBDescriptor.getInstance().getConfig().setEnableWal(prevEnableWAL);
  
 -    RaftServer.setSyncLeaderMaxWaitMs(syncLeaderMaxWait);
 -    RaftServer.setHeartbeatIntervalMs(heartBeatInterval);
 -    RaftServer.setElectionTimeoutMs(electionTimeout);
 +    ClusterConstant.setSyncLeaderMaxWaitMs(syncLeaderMaxWait);
-     ClusterConstant.setHeartBeatIntervalMs(heartBeatInterval);
++    ClusterConstant.setHeartbeatIntervalMs(heartBeatInterval);
++    ClusterConstant.setElectionTimeoutMs(electionTimeout);
    }
  
    DataGroupMember getDataGroupMember(Node node) {
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 016a722,56f0bd8..33aee68
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@@ -691,8 -696,8 +691,8 @@@ public class MetaGroupMemberTest extend
    @Test
    public void testJoinClusterFailed() throws QueryProcessException {
      System.out.println("Start testJoinClusterFailed()");
-     long prevInterval = ClusterConstant.getHeartBeatIntervalMs();
-     ClusterConstant.setHeartBeatIntervalMs(10);
 -    long prevInterval = RaftServer.getHeartbeatIntervalMs();
 -    RaftServer.setHeartbeatIntervalMs(10);
++    long prevInterval = ClusterConstant.getHeartbeatIntervalMs();
++    ClusterConstant.setHeartbeatIntervalMs(10);
      ClusterDescriptor.getInstance().getConfig().setJoinClusterTimeOutMs(100);
      dummyResponse.set(Response.RESPONSE_NO_CONNECTION);
      MetaGroupMember newMember = getMetaGroupMember(TestUtils.getNode(10));
@@@ -703,7 -708,7 +703,7 @@@
        assertTrue(e instanceof StartUpCheckFailureException);
      } finally {
        newMember.closeLogManager();
-       ClusterConstant.setHeartBeatIntervalMs(prevInterval);
 -      RaftServer.setHeartbeatIntervalMs(prevInterval);
++      ClusterConstant.setHeartbeatIntervalMs(prevInterval);
      }
    }
  

Reply via email to