This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bf127a35403cc9426c02d52088c32ba006720f6d
Merge: 8d02321 df88404
Author: jt <[email protected]>
AuthorDate: Wed Jan 5 17:17:43 2022 +0800

    Merge branch 'client_manager_add_close' into expr
    
    # Conflicts:
    #   
cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java

 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  11 +-
 .../apache/iotdb/cluster/client/BaseFactory.java   |  13 +
 .../apache/iotdb/cluster/client/ClientManager.java |  37 ++-
 .../iotdb/cluster/client/IClientManager.java       |   2 +
 .../cluster/client/async/AsyncDataClient.java      |   4 +
 .../log/sequencing/AsynchronousSequencer.java      |  12 +-
 .../iotdb/cluster/log/sequencing/LogSequencer.java |   2 +
 .../log/sequencing/SynchronousSequencer.java       |   5 +
 .../cluster/query/ClusterUDTFQueryExecutor.java    |   4 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  20 +-
 .../iotdb/cluster/server/StoppedMemberManager.java |   6 +
 .../cluster/server/member/DataGroupMember.java     |   4 +-
 .../cluster/server/member/MetaGroupMember.java     |   4 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  28 +-
 .../cluster/server/service/DataAsyncService.java   |   2 +-
 .../cluster/server/service/DataGroupEngine.java    |   2 +
 .../cluster/server/service/DataSyncService.java    |   2 +-
 .../iotdb/cluster/client/ClientManagerTest.java    | 300 ++++++++++++---------
 .../cluster/client/ClientPoolFactoryTest.java      |   4 +
 .../iotdb/cluster/client/MockClientManager.java    |   3 +
 .../cluster/client/async/AsyncDataClientTest.java  |   2 +
 .../cluster/client/async/AsyncMetaClientTest.java  |   2 +
 .../iotdb/cluster/common/TestDataGroupMember.java  |   6 +-
 .../iotdb/cluster/common/TestMetaGroupMember.java  |   7 +-
 .../iotdb/cluster/log/LogDispatcherTest.java       |  13 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   5 +-
 .../cluster/log/applier/MetaLogApplierTest.java    |   1 -
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |   1 -
 .../cluster/log/catchup/LogCatchUpTaskTest.java    |   1 -
 .../log/catchup/SnapshotCatchUpTaskTest.java       |   1 -
 .../cluster/log/snapshot/DataSnapshotTest.java     |   2 -
 .../log/snapshot/MetaSimpleSnapshotTest.java       |   1 -
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |   2 -
 .../query/ClusterAggregateExecutorTest.java        |   7 +
 .../query/ClusterDataQueryExecutorTest.java        |  87 ++++++
 .../query/fill/ClusterFillExecutorTest.java        |   7 +
 .../ClusterGroupByNoVFilterDataSetTest.java        |   7 +
 .../groupby/ClusterGroupByVFilterDataSetTest.java  |   7 +
 .../query/groupby/MergeGroupByExecutorTest.java    |   7 +
 .../query/groupby/RemoteGroupByExecutorTest.java   |   7 +
 .../query/last/ClusterLastQueryExecutorTest.java   |   7 +
 .../query/reader/ClusterReaderFactoryTest.java     |   7 +
 .../query/reader/ClusterTimeGeneratorTest.java     |   7 +
 .../cluster/query/reader/DatasourceInfoTest.java   |   4 +
 .../reader/RemoteSeriesReaderByTimestampTest.java  |   3 +
 .../query/reader/RemoteSimpleSeriesReaderTest.java |   4 +
 .../mult/AssignPathManagedMergeReaderTest.java     |   4 +
 .../reader/mult/RemoteMultSeriesReaderTest.java    |   7 +
 .../server/clusterinfo/ClusterInfoServerTest.java  |   6 +-
 .../clusterinfo/ClusterInfoServiceImplTest.java    |   9 +-
 .../caller/AppendNodeEntryHandlerTest.java         |   1 -
 .../handlers/caller/ElectionHandlerTest.java       |   1 -
 .../handlers/caller/HeartbeatHandlerTest.java      |   1 -
 .../handlers/caller/LogCatchUpHandlerTest.java     |   1 -
 .../server/heartbeat/DataHeartbeatThreadTest.java  |   2 +-
 .../server/heartbeat/HeartbeatThreadTest.java      |   1 -
 .../iotdb/cluster/server/member/BaseMember.java    |  16 +-
 .../cluster/server/member/DataGroupMemberTest.java |  12 +-
 .../cluster/server/member/MetaGroupMemberTest.java |  18 +-
 .../cluster/server/member/RaftMemberTest.java      |  15 ++
 .../UserGuide/Data-Concept/Auto-Create-MetaData.md |  14 +-
 docs/UserGuide/Data-Concept/Compression.md         |  95 ++++++-
 .../Data-Concept/Data-Model-and-Terminology.md     |  37 ++-
 docs/UserGuide/Data-Concept/Data-Type.md           |  14 +-
 docs/UserGuide/Data-Concept/Encoding.md            |  11 +-
 docs/UserGuide/Data-Concept/SDT.md                 | 111 --------
 docs/UserGuide/Data-Concept/Schema-Template.md     |   8 +-
 docs/UserGuide/Data-Concept/Time-Partition.md      |   8 +-
 .../DML-Data-Manipulation-Language.md              |  33 ++-
 .../UserGuide/Data-Concept/Auto-Create-MetaData.md |  14 +-
 docs/zh/UserGuide/Data-Concept/Compression.md      |  90 ++++++-
 .../Data-Concept/Data-Model-and-Terminology.md     |  34 ++-
 docs/zh/UserGuide/Data-Concept/Data-Type.md        |  18 +-
 docs/zh/UserGuide/Data-Concept/Encoding.md         |   6 +-
 docs/zh/UserGuide/Data-Concept/SDT.md              | 106 --------
 docs/zh/UserGuide/Data-Concept/Schema-Template.md  |  32 +--
 docs/zh/UserGuide/Data-Concept/Time-Partition.md   |   8 +-
 .../DML-Data-Manipulation-Language.md              |  29 +-
 .../integration/IOTDBGroupByInnerIntervalIT.java   |   2 +-
 .../IoTDBQueryWithComplexValueFilterIT.java        | 122 +++++++++
 .../db/integration/aligned/IoTDBDeletionIT.java    |  31 +++
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |   4 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  51 ++--
 .../db/engine/querycontext/QueryDataSource.java    |  45 ++++
 .../db/engine/storagegroup/TsFileProcessor.java    |  74 ++---
 .../db/engine/storagegroup/TsFileResource.java     | 209 ++++++++++----
 .../storagegroup/VirtualStorageGroupProcessor.java |  61 +++--
 .../virtualSg/StorageGroupManager.java             |   9 +
 .../org/apache/iotdb/db/metadata/MManager.java     |  21 +-
 .../org/apache/iotdb/db/metadata/mtree/MTree.java  |  23 +-
 .../apache/iotdb/db/metadata/path/AlignedPath.java |   6 +-
 .../iotdb/db/metadata/path/MeasurementPath.java    |   8 +-
 .../apache/iotdb/db/metadata/path/PartialPath.java |   7 +
 .../apache/iotdb/db/metadata/tag/TagManager.java   |  24 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |   4 +
 .../iotdb/db/query/context/QueryContext.java       |   9 +
 .../db/query/control/QueryResourceManager.java     | 110 +++++++-
 .../db/query/control/tracing/TracingInfo.java      |   8 +
 .../db/query/control/tracing/TracingManager.java   |   4 +
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  17 +-
 .../groupby/GroupByWithValueFilterDataSet.java     |  30 ++-
 .../groupby/GroupByWithoutValueFilterDataSet.java  |  24 +-
 .../dataset/groupby/LocalGroupByExecutor.java      |   5 +-
 .../db/query/executor/AggregationExecutor.java     |  49 +++-
 .../iotdb/db/query/executor/FillQueryExecutor.java | 113 +++++---
 .../iotdb/db/query/executor/LastQueryExecutor.java |  14 +-
 .../db/query/executor/RawDataQueryExecutor.java    |  38 ++-
 .../iotdb/db/query/executor/UDFQueryExecutor.java  |   4 +-
 .../metadata/MemAlignedChunkMetadataLoader.java    |   4 +-
 .../chunk/metadata/MemChunkMetadataLoader.java     |   4 +-
 .../query/reader/series/SeriesAggregateReader.java |  27 ++
 .../reader/series/SeriesRawDataBatchReader.java    |   1 -
 .../iotdb/db/query/reader/series/SeriesReader.java | 212 ++++++++++-----
 .../reader/series/SeriesReaderByTimestamp.java     |  27 ++
 .../query/timegenerator/ServerTimeGenerator.java   |  60 ++++-
 .../db/service/basic/BasicServiceProvider.java     |   3 +-
 .../db/service/basic/QueryFrequencyRecorder.java   |  19 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |   1 +
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |   5 +-
 .../java/org/apache/iotdb/db/utils/QueryUtils.java |  26 ++
 .../db/utils/datastructure/AlignedTVList.java      |  20 +-
 .../engine/modification/DeletionFileNodeTest.java  |  44 ++-
 .../storagegroup/StorageGroupProcessorTest.java    |  70 ++++-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |  23 +-
 .../engine/storagegroup/TsFileProcessorTest.java   |  40 +--
 .../iotdb/db/metadata/MManagerBasicTest.java       |  48 +++-
 .../reader/series/SeriesAggregateReaderTest.java   |   6 +-
 .../reader/series/SeriesReaderByTimestampTest.java |   6 +-
 site/src/main/.vuepress/config.js                  |  10 +-
 .../read/query/timegenerator/TimeGenerator.java    |   3 +
 .../query/timegenerator/TsFileTimeGenerator.java   |   6 +
 .../tsfile/read/reader/FakedTimeGenerator.java     |   6 +
 132 files changed, 2294 insertions(+), 935 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 60d2192,0104bc4..10b6df8
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@@ -155,7 -155,7 +155,7 @@@ public class ClusterIoTDB implements Cl
      TProtocolFactory protocolFactory =
          ThriftServiceThread.getProtocolFactory(
              
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
--    metaGroupMember = new MetaGroupMember(protocolFactory, thisNode, 
coordinator);
++    metaGroupMember = new MetaGroupMember(thisNode, coordinator);
      IoTDB.setClusterMode();
      IoTDB.setMetaManager(CMManager.getInstance());
      ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupMember);
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 0def4d0,0000000..c6729dd
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@@ -1,148 -1,0 +1,154 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.cluster.log.sequencing;
 +
 +import org.apache.iotdb.cluster.log.Log;
 +import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 +import org.apache.iotdb.cluster.log.VotingLog;
 +import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 +import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 +import org.apache.iotdb.cluster.server.member.RaftMember;
 +import org.apache.iotdb.cluster.server.monitor.Timer;
 +import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +public class AsynchronousSequencer implements LogSequencer {
 +
 +  private static final Logger logger = 
LoggerFactory.getLogger(AsynchronousSequencer.class);
-   private static final ExecutorService SEQUENCER_POOL =
-       IoTDBThreadPoolFactory.newCachedThreadPool("SequencerPool");
 +  private static final int SEQUENCER_PARALLELISM = 4;
 +
++  private ExecutorService sequencerPool =
++      IoTDBThreadPoolFactory.newFixedThreadPool(SEQUENCER_PARALLELISM, 
"SequencerPool");
++
 +  private RaftMember member;
 +  private RaftLogManager logManager;
 +
 +  private BlockingQueue<SendLogRequest> unsequencedLogQueue;
 +
 +  public AsynchronousSequencer(RaftMember member, RaftLogManager logManager) {
 +    this.member = member;
 +    this.logManager = logManager;
 +    unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
 +    for (int i = 0; i < SEQUENCER_PARALLELISM; i++) {
-       SEQUENCER_POOL.submit(this::sequenceTask);
++      sequencerPool.submit(this::sequenceTask);
 +    }
 +  }
 +
 +  public SendLogRequest enqueueSendLogRequest(Log log) {
 +    VotingLog votingLog = member.buildVotingLog(log);
 +    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
 +    AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
 +
 +    SendLogRequest request =
 +        new SendLogRequest(
 +            votingLog, leaderShipStale, newLeaderTerm, null, 
member.getAllNodes().size() / 2);
 +    try {
 +      unsequencedLogQueue.put(request);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      logger.warn("Interrupted while putting {}", log);
 +    }
 +    return request;
 +  }
 +
 +  private void sequenceLogs(List<SendLogRequest> sendLogRequests) {
 +    long startTime;
 +    synchronized (logManager) {
 +      for (SendLogRequest sendLogRequest : sendLogRequests) {
 +        Log log = sendLogRequest.getVotingLog().getLog();
 +        log.setCurrLogTerm(member.getTerm().get());
 +        log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 +        if (log instanceof PhysicalPlanLog) {
 +          ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
 +        }
 +
 +        startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
 +        // just like processPlanLocally,we need to check the size of log
 +
 +        // logDispatcher will serialize log, and set log size, and we will 
use the size after it
 +        logManager.append(log);
 +        
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
 +
 +        AppendEntryRequest appendEntryRequest = 
member.buildAppendEntryRequest(log, false);
 +        sendLogRequest.setAppendEntryRequest(appendEntryRequest);
 +
 +        startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
 +        log.setCreateTime(System.nanoTime());
 +        member.getVotingLogList().insert(sendLogRequest.getVotingLog());
 +        member.getLogDispatcher().offer(sendLogRequest);
 +        
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
 +      }
 +    }
 +    sendLogRequests.clear();
 +  }
 +
 +  private void sequenceTask() {
 +    List<SendLogRequest> sendLogRequests = new ArrayList<>();
 +    while (!Thread.interrupted()) {
 +      try {
 +        synchronized (unsequencedLogQueue) {
 +          SendLogRequest request = unsequencedLogQueue.take();
 +          sendLogRequests.add(request);
 +          unsequencedLogQueue.drainTo(sendLogRequests);
 +        }
 +
 +        sequenceLogs(sendLogRequests);
 +      } catch (InterruptedException e) {
 +        Thread.currentThread().interrupt();
 +        return;
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public SendLogRequest sequence(Log log) {
 +    return enqueueSendLogRequest(log);
 +  }
 +
 +  @Override
 +  public void setLogManager(RaftLogManager logManager) {
 +    this.logManager = logManager;
 +  }
 +
 +  public static class Factory implements LogSequencerFactory {
 +
 +    @Override
 +    public LogSequencer create(RaftMember member, RaftLogManager logManager) {
 +      return new AsynchronousSequencer(member, logManager);
 +    }
 +  }
++
++  @Override
++  public void close() {
++    sequencerPool.shutdownNow();
++  }
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java
index b418162,0000000..ded77c5
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java
@@@ -1,42 -1,0 +1,44 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.cluster.log.sequencing;
 +
 +import org.apache.iotdb.cluster.log.Log;
 +import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 +import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 +
 +/**
 + * LogSequencer assigns a unique index and associated term to a log entry and 
offers the entry to a
 + * LogDispatcher which will send the entry to followers.
 + */
 +public interface LogSequencer {
 +
 +  /**
 +   * assigns a unique index and associated term to a log entry and offers the 
entry to a
 +   * LogDispatcher which will send the entry to followers.
 +   *
 +   * @param log a log entry that is not yet indexed.
 +   * @return A SendLogRequest through which the caller can monitor the status 
of the sending entry.
 +   */
 +  SendLogRequest sequence(Log log);
 +
 +  void setLogManager(RaftLogManager logManager);
++
++  void close();
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index a749960,0000000..19b40a0
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@@ -1,115 -1,0 +1,120 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.cluster.log.sequencing;
 +
 +import org.apache.iotdb.cluster.log.Log;
 +import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 +import org.apache.iotdb.cluster.log.VotingLog;
 +import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 +import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 +import org.apache.iotdb.cluster.server.member.RaftMember;
 +import org.apache.iotdb.cluster.server.monitor.Timer;
 +import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 +
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +/**
 + * SynchronizedSequencer performs sequencing by taking the monitor of a 
LogManager within the caller
 + * thread.
 + */
 +public class SynchronousSequencer implements LogSequencer {
 +
 +  private RaftMember member;
 +  private RaftLogManager logManager;
 +
 +  public SynchronousSequencer(RaftMember member, RaftLogManager logManager) {
 +    this.member = member;
 +    this.logManager = logManager;
 +  }
 +
 +  @Override
 +  public SendLogRequest sequence(Log log) {
 +    SendLogRequest sendLogRequest;
 +
 +    long startTime =
 +        
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
 +    synchronized (logManager) {
 +      
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
 +          startTime);
 +
 +      log.setCurrLogTerm(member.getTerm().get());
 +      log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 +
 +      if (log instanceof PhysicalPlanLog) {
 +        PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
 +        physicalPlanLog.getPlan().setIndex(log.getCurrLogIndex());
 +      }
 +
 +      startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
 +
 +      // logDispatcher will serialize log, and set log size, and we will use 
the size after it
 +      logManager.append(log);
 +      
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
 +
 +      startTime = 
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
 +      sendLogRequest = buildSendLogRequest(log);
 +      
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
 +
 +      startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
 +      log.setCreateTime(System.nanoTime());
 +      member.getVotingLogList().insert(sendLogRequest.getVotingLog());
 +      member.getLogDispatcher().offer(sendLogRequest);
 +      
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
 +    }
 +    return sendLogRequest;
 +  }
 +
 +  @Override
 +  public void setLogManager(RaftLogManager logManager) {
 +    this.logManager = logManager;
 +  }
 +
 +  private SendLogRequest buildSendLogRequest(Log log) {
 +    VotingLog votingLog = member.buildVotingLog(log);
 +    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
 +    AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
 +
 +    long startTime = 
Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
 +    AppendEntryRequest appendEntryRequest = 
member.buildAppendEntryRequest(log, false);
 +    
Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
 +
 +    return new SendLogRequest(
 +        votingLog,
 +        leaderShipStale,
 +        newLeaderTerm,
 +        appendEntryRequest,
 +        member.getAllNodes().size() / 2);
 +  }
 +
 +  public static class Factory implements LogSequencerFactory {
 +
 +    @Override
 +    public LogSequencer create(RaftMember member, RaftLogManager logManager) {
 +      return new SynchronousSequencer(member, logManager);
 +    }
 +  }
++
++  @Override
++  public void close() {
++
++  }
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 533adf8,36ee967..2a80781
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@@ -170,7 -170,7 +170,7 @@@ public class DataGroupMember extends Ra
    private LastAppliedPatitionTableVersion lastAppliedPartitionTableVersion;
  
    @TestOnly
--  public DataGroupMember(PartitionGroup nodes) {
++  public DataGroupMember(Node thisNode, PartitionGroup nodes) {
      // constructor for test
      this.name =
          "Data-"
@@@ -180,7 -180,7 +180,8 @@@
              + "-raftId-"
              + nodes.getRaftId()
              + "";
 -    allNodes = nodes;
++    setThisNode(thisNode);
 +    setAllNodes(nodes);
      mbeanName =
          String.format(
              "%s:%s=%s%d",
@@@ -191,13 -191,9 +192,14 @@@
      setQueryManager(new ClusterQueryManager());
      localQueryExecutor = new LocalQueryExecutor(this);
      lastAppliedPartitionTableVersion = new 
LastAppliedPatitionTableVersion(getMemberDir());
 +    appenderFactory =
 +        
ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow()
 +            ? new SlidingWindowLogAppender.Factory()
 +            : new BlockingLogAppender.Factory();
++    logSequencer = SEQUENCER_FACTORY.create(this, logManager);
    }
  
 -  DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, 
MetaGroupMember metaGroupMember) {
 +  DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember 
metaGroupMember) {
      // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
      super(
          "Data-"
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 4764895,f057131..fc2ddc0
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -209,13 -210,13 +209,14 @@@ public class MetaGroupMember extends Ra
    @TestOnly
    public MetaGroupMember() {}
  
--  public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator 
coordinator) {
++  public MetaGroupMember(Node thisNode, Coordinator coordinator) {
      super(
          "Meta",
          new ClientManager(
              ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
              ClientManager.Type.MetaGroupClient));
 -    allNodes = new PartitionGroup();
++    setThisNode(thisNode);
 +    setAllNodes(new PartitionGroup());
      initPeerMap();
  
      // committed logs are applied to the state machine (the IoTDB instance) 
through the applier
@@@ -225,7 -225,7 +226,6 @@@
      term.set(logManager.getHardState().getCurrentTerm());
      voteFor = logManager.getHardState().getVoteFor();
  
--    setThisNode(thisNode);
      // load the identifier from the disk or generate a new one
      loadIdentifier();
      allNodes.add(thisNode);
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4d69153,d74579c..ea3b463
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@@ -356,6 -304,10 +356,13 @@@ public abstract class RaftMember implem
    public void stop() {
      setSkipElection(true);
      closeLogManager();
+     if (clientManager != null) {
+       clientManager.close();
+     }
++    if (logSequencer != null) {
++      logSequencer.close();
++    }
+ 
      if (heartBeatService == null) {
        return;
      }
@@@ -772,20 -652,6 +779,24 @@@
  
    public void setAllNodes(PartitionGroup allNodes) {
      this.allNodes = allNodes;
++    if (allNodes.isEmpty()) {
++      return;
++    }
++
 +    this.votingLogList = new VotingLogList(allNodes.size() / 2);
 +
 +    // update the reference of thisNode to keep consistency
 +    boolean foundThisNode = false;
 +    for (Node node : allNodes) {
 +      if (ClusterUtils.isNodeEquals(node, thisNode)) {
 +        thisNode = node;
 +        foundThisNode = true;
 +        break;
 +      }
 +    }
 +    if (!foundThisNode) {
 +      logger.error("{}: did not find this node {}, in the raft group {}", 
name, thisNode, allNodes);
 +    }
    }
  
    public Map<Node, Long> getLastCatchUpResponseTime() {
@@@ -1661,29 -1516,18 +1672,28 @@@
     * one follower tells the node that it is no longer a valid leader, or a 
timeout is triggered.
     */
    @SuppressWarnings({"java:S2445"}) // safe synchronized
 -  private AppendLogResult waitAppendResult(
 -      AtomicInteger voteCounter, AtomicBoolean leaderShipStale, AtomicLong 
newLeaderTerm) {
 +  protected AppendLogResult waitAppendResult(
 +      VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, 
int quorumSize) {
      // wait for the followers to vote
      long startTime = 
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
 -    synchronized (voteCounter) {
 +    long nextTimeToPrint = 15000;
 +
 +    int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
 +    int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
 +    int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
 +
 +    synchronized (log) {
        long waitStart = System.currentTimeMillis();
        long alreadyWait = 0;
-       while (
-           log.getLog().getCurrLogIndex() == -1 ||
-           stronglyAcceptedNodeNum < quorumSize
-           && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
-               || (totalAccepted < quorumSize)
-               || votingLogList.size() > config.getMaxNumOfLogsInMem())
 -      while (voteCounter.get() > 0
--          && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
-           && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
 -          && voteCounter.get() != Integer.MAX_VALUE) {
++      while (log.getLog().getCurrLogIndex() == -1
++          || stronglyAcceptedNodeNum < quorumSize
++              && (!(ENABLE_WEAK_ACCEPTANCE && 
canBeWeaklyAccepted(log.getLog()))
++                  || (totalAccepted < quorumSize)
++                  || votingLogList.size() > config.getMaxNumOfLogsInMem())
++              && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
++              && 
!log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
          try {
 -          voteCounter.wait(ClusterConstant.getWriteOperationTimeoutMS());
 +          log.wait(1);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Unexpected interruption when sending a log", e);
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
index 6056fff,6056fff..4df6a90
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
@@@ -31,14 -31,14 +31,14 @@@ import java.util.Collections
  public class TestDataGroupMember extends DataGroupMember {
  
    public TestDataGroupMember() {
--    super(new 
PartitionGroup(Collections.singletonList(TestUtils.getNode(0))));
++    super(TestUtils.getNode(0),
++        new PartitionGroup(Collections.singletonList(TestUtils.getNode(0))));
      setQueryManager(new ClusterQueryManager());
      this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, "");
    }
  
    public TestDataGroupMember(Node thisNode, PartitionGroup allNodes) {
--    super(allNodes);
--    this.thisNode = thisNode;
++    super(thisNode, allNodes);
      this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, "");
      setQueryManager(new ClusterQueryManager());
    }
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
index 8320ea7,761f5e5..c0bd588
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
@@@ -33,16 -35,5 +33,17 @@@ public class TestMetaGroupMember extend
      MetaSingleSnapshotLogManager manager =
          new MetaSingleSnapshotLogManager(new TestLogApplier(), this);
      setLogManager(manager);
 +
 +    PartitionGroup group = new PartitionGroup();
 +    thisNode = TestUtils.getNode(0);
 +    for (int i = 0; i < 10; i++) {
 +      group.add(TestUtils.getNode(i));
 +    }
 +    setAllNodes(group);
 +
-     this.clientManager = new ClientManager(
-         ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
-         ClientManager.Type.MetaGroupClient);
++    this.clientManager =
++        new ClientManager(
++            ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
++            ClientManager.Type.MetaGroupClient);
    }
  }
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index 1ff27b5,31f553f..2f7b90c
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@@ -19,9 -19,6 +19,8 @@@
  
  package org.apache.iotdb.cluster.log;
  
- import org.apache.iotdb.cluster.client.ClientCategory;
 +import org.apache.iotdb.cluster.client.ClientManager;
 +import org.apache.iotdb.cluster.client.ClientManager.Type;
  import org.apache.iotdb.cluster.common.TestAsyncClient;
  import org.apache.iotdb.cluster.common.TestMetaGroupMember;
  import org.apache.iotdb.cluster.common.TestSyncClient;
@@@ -110,38 -104,6 +109,41 @@@ public class LogDispatcherTest 
            }
  
            @Override
 +          public AsyncClient getAsyncClient(Node node) {
 +            return new TestAsyncClient() {
 +
 +              @Override
-               public void appendEntry(AppendEntryRequest request,
-                   AsyncMethodCallback<AppendEntryResult> resultHandler) 
throws TException {
++              public void appendEntry(
++                  AppendEntryRequest request, 
AsyncMethodCallback<AppendEntryResult> resultHandler)
++                  throws TException {
 +                try {
 +                  if (!downNode.contains(node)) {
 +                    resultHandler.onComplete(mockedAppendEntry(request));
 +                  }
 +                  resultHandler.onComplete(new AppendEntryResult(-1));
 +                } catch (UnknownLogTypeException e) {
 +                  throw new TException(e);
 +                }
 +              }
 +
 +              @Override
-               public void appendEntries(AppendEntriesRequest request,
-                   AsyncMethodCallback<AppendEntryResult> resultHandler) 
throws TException {
++              public void appendEntries(
++                  AppendEntriesRequest request,
++                  AsyncMethodCallback<AppendEntryResult> resultHandler)
++                  throws TException {
 +                try {
 +                  if (!downNode.contains(node)) {
 +                    resultHandler.onComplete(mockedAppendEntries(request));
 +                  }
 +                  resultHandler.onComplete(new AppendEntryResult(-1));
 +                } catch (UnknownLogTypeException e) {
 +                  throw new TException(e);
 +                }
 +              }
 +            };
 +          }
 +
 +          @Override
            public Client getSyncClient(Node node) {
              return new TestSyncClient() {
                @Override
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index b2f8e01,34c1458..7eb96a3
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@@ -94,7 -93,7 +94,6 @@@ import org.apache.iotdb.tsfile.write.sc
  import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
  
  import org.apache.thrift.async.AsyncMethodCallback;
--import org.apache.thrift.protocol.TCompactProtocol.Factory;
  import org.junit.After;
  import org.junit.Before;
  import org.junit.Test;
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 4e4f82f,736b269..99aa6d3
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@@ -348,9 -354,9 +351,9 @@@ public class MetaGroupMemberTest extend
    }
  
    @Override
--  protected MetaGroupMember getMetaGroupMember(Node node) throws 
QueryProcessException {
++  protected MetaGroupMember getMetaGroupMember(Node node) {
      MetaGroupMember metaGroupMember =
--        new MetaGroupMember(new Factory(), node, new Coordinator()) {
++        new MetaGroupMember(node, new Coordinator()) {
  
            @Override
            public void applyAddNode(AddNodeLog addNodeLog) {
@@@ -809,7 -883,7 +812,7 @@@
      System.out.println("Start testProcessNonQuery()");
      mockDataClusterServer = true;
  
--    MetaGroupMember testMetaMember2 = 
getMetaGroupMember(TestUtils.getNode(2));
++    MetaGroupMember testMetaMember2 = 
getMetaGroupMember(TestUtils.getNode(20));
      testMetaMember2.setCharacter(LEADER);
  
      // as a follower

Reply via email to