This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch expr in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bf127a35403cc9426c02d52088c32ba006720f6d Merge: 8d02321 df88404 Author: jt <[email protected]> AuthorDate: Wed Jan 5 17:17:43 2022 +0800 Merge branch 'client_manager_add_close' into expr # Conflicts: # cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java .../org/apache/iotdb/cluster/ClusterIoTDB.java | 11 +- .../apache/iotdb/cluster/client/BaseFactory.java | 13 + .../apache/iotdb/cluster/client/ClientManager.java | 37 ++- .../iotdb/cluster/client/IClientManager.java | 2 + .../cluster/client/async/AsyncDataClient.java | 4 + .../log/sequencing/AsynchronousSequencer.java | 12 +- .../iotdb/cluster/log/sequencing/LogSequencer.java | 2 + .../log/sequencing/SynchronousSequencer.java | 5 + .../cluster/query/ClusterUDTFQueryExecutor.java | 4 +- .../iotdb/cluster/query/LocalQueryExecutor.java | 20 +- .../iotdb/cluster/server/StoppedMemberManager.java | 6 + .../cluster/server/member/DataGroupMember.java | 4 +- .../cluster/server/member/MetaGroupMember.java | 4 +- .../iotdb/cluster/server/member/RaftMember.java | 28 +- .../cluster/server/service/DataAsyncService.java | 2 +- .../cluster/server/service/DataGroupEngine.java | 2 + .../cluster/server/service/DataSyncService.java | 2 +- .../iotdb/cluster/client/ClientManagerTest.java | 300 ++++++++++++--------- .../cluster/client/ClientPoolFactoryTest.java | 4 + .../iotdb/cluster/client/MockClientManager.java | 3 + .../cluster/client/async/AsyncDataClientTest.java | 2 + .../cluster/client/async/AsyncMetaClientTest.java | 2 + .../iotdb/cluster/common/TestDataGroupMember.java | 6 +- .../iotdb/cluster/common/TestMetaGroupMember.java | 7 +- .../iotdb/cluster/log/LogDispatcherTest.java | 13 +- .../cluster/log/applier/DataLogApplierTest.java | 5 +- .../cluster/log/applier/MetaLogApplierTest.java | 1 - .../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 1 - .../cluster/log/catchup/LogCatchUpTaskTest.java | 1 - .../log/catchup/SnapshotCatchUpTaskTest.java | 1 - .../cluster/log/snapshot/DataSnapshotTest.java | 2 - .../log/snapshot/MetaSimpleSnapshotTest.java | 1 - .../cluster/log/snapshot/PullSnapshotTaskTest.java | 2 - .../query/ClusterAggregateExecutorTest.java | 7 + .../query/ClusterDataQueryExecutorTest.java | 87 ++++++ .../query/fill/ClusterFillExecutorTest.java | 7 + .../ClusterGroupByNoVFilterDataSetTest.java | 7 + .../groupby/ClusterGroupByVFilterDataSetTest.java | 7 + .../query/groupby/MergeGroupByExecutorTest.java | 7 + .../query/groupby/RemoteGroupByExecutorTest.java | 7 + .../query/last/ClusterLastQueryExecutorTest.java | 7 + .../query/reader/ClusterReaderFactoryTest.java | 7 + .../query/reader/ClusterTimeGeneratorTest.java | 7 + .../cluster/query/reader/DatasourceInfoTest.java | 4 + .../reader/RemoteSeriesReaderByTimestampTest.java | 3 + .../query/reader/RemoteSimpleSeriesReaderTest.java | 4 + .../mult/AssignPathManagedMergeReaderTest.java | 4 + .../reader/mult/RemoteMultSeriesReaderTest.java | 7 + .../server/clusterinfo/ClusterInfoServerTest.java | 6 +- .../clusterinfo/ClusterInfoServiceImplTest.java | 9 +- .../caller/AppendNodeEntryHandlerTest.java | 1 - .../handlers/caller/ElectionHandlerTest.java | 1 - .../handlers/caller/HeartbeatHandlerTest.java | 1 - .../handlers/caller/LogCatchUpHandlerTest.java | 1 - .../server/heartbeat/DataHeartbeatThreadTest.java | 2 +- .../server/heartbeat/HeartbeatThreadTest.java | 1 - .../iotdb/cluster/server/member/BaseMember.java | 16 +- .../cluster/server/member/DataGroupMemberTest.java | 12 +- .../cluster/server/member/MetaGroupMemberTest.java | 18 +- .../cluster/server/member/RaftMemberTest.java | 15 ++ .../UserGuide/Data-Concept/Auto-Create-MetaData.md | 14 +- docs/UserGuide/Data-Concept/Compression.md | 95 ++++++- .../Data-Concept/Data-Model-and-Terminology.md | 37 ++- docs/UserGuide/Data-Concept/Data-Type.md | 14 +- docs/UserGuide/Data-Concept/Encoding.md | 11 +- docs/UserGuide/Data-Concept/SDT.md | 111 -------- docs/UserGuide/Data-Concept/Schema-Template.md | 8 +- docs/UserGuide/Data-Concept/Time-Partition.md | 8 +- .../DML-Data-Manipulation-Language.md | 33 ++- .../UserGuide/Data-Concept/Auto-Create-MetaData.md | 14 +- docs/zh/UserGuide/Data-Concept/Compression.md | 90 ++++++- .../Data-Concept/Data-Model-and-Terminology.md | 34 ++- docs/zh/UserGuide/Data-Concept/Data-Type.md | 18 +- docs/zh/UserGuide/Data-Concept/Encoding.md | 6 +- docs/zh/UserGuide/Data-Concept/SDT.md | 106 -------- docs/zh/UserGuide/Data-Concept/Schema-Template.md | 32 +-- docs/zh/UserGuide/Data-Concept/Time-Partition.md | 8 +- .../DML-Data-Manipulation-Language.md | 29 +- .../integration/IOTDBGroupByInnerIntervalIT.java | 2 +- .../IoTDBQueryWithComplexValueFilterIT.java | 122 +++++++++ .../db/integration/aligned/IoTDBDeletionIT.java | 31 +++ .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 4 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 51 ++-- .../db/engine/querycontext/QueryDataSource.java | 45 ++++ .../db/engine/storagegroup/TsFileProcessor.java | 74 ++--- .../db/engine/storagegroup/TsFileResource.java | 209 ++++++++++---- .../storagegroup/VirtualStorageGroupProcessor.java | 61 +++-- .../virtualSg/StorageGroupManager.java | 9 + .../org/apache/iotdb/db/metadata/MManager.java | 21 +- .../org/apache/iotdb/db/metadata/mtree/MTree.java | 23 +- .../apache/iotdb/db/metadata/path/AlignedPath.java | 6 +- .../iotdb/db/metadata/path/MeasurementPath.java | 8 +- .../apache/iotdb/db/metadata/path/PartialPath.java | 7 + .../apache/iotdb/db/metadata/tag/TagManager.java | 24 +- .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 4 + .../iotdb/db/query/context/QueryContext.java | 9 + .../db/query/control/QueryResourceManager.java | 110 +++++++- .../db/query/control/tracing/TracingInfo.java | 8 + .../db/query/control/tracing/TracingManager.java | 4 + .../dataset/RawQueryDataSetWithoutValueFilter.java | 17 +- .../groupby/GroupByWithValueFilterDataSet.java | 30 ++- .../groupby/GroupByWithoutValueFilterDataSet.java | 24 +- .../dataset/groupby/LocalGroupByExecutor.java | 5 +- .../db/query/executor/AggregationExecutor.java | 49 +++- .../iotdb/db/query/executor/FillQueryExecutor.java | 113 +++++--- .../iotdb/db/query/executor/LastQueryExecutor.java | 14 +- .../db/query/executor/RawDataQueryExecutor.java | 38 ++- .../iotdb/db/query/executor/UDFQueryExecutor.java | 4 +- .../metadata/MemAlignedChunkMetadataLoader.java | 4 +- .../chunk/metadata/MemChunkMetadataLoader.java | 4 +- .../query/reader/series/SeriesAggregateReader.java | 27 ++ .../reader/series/SeriesRawDataBatchReader.java | 1 - .../iotdb/db/query/reader/series/SeriesReader.java | 212 ++++++++++----- .../reader/series/SeriesReaderByTimestamp.java | 27 ++ .../query/timegenerator/ServerTimeGenerator.java | 60 ++++- .../db/service/basic/BasicServiceProvider.java | 3 +- .../db/service/basic/QueryFrequencyRecorder.java | 19 +- .../db/service/thrift/impl/TSServiceImpl.java | 1 + .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 5 +- .../java/org/apache/iotdb/db/utils/QueryUtils.java | 26 ++ .../db/utils/datastructure/AlignedTVList.java | 20 +- .../engine/modification/DeletionFileNodeTest.java | 44 ++- .../storagegroup/StorageGroupProcessorTest.java | 70 ++++- .../iotdb/db/engine/storagegroup/TTLTest.java | 23 +- .../engine/storagegroup/TsFileProcessorTest.java | 40 +-- .../iotdb/db/metadata/MManagerBasicTest.java | 48 +++- .../reader/series/SeriesAggregateReaderTest.java | 6 +- .../reader/series/SeriesReaderByTimestampTest.java | 6 +- site/src/main/.vuepress/config.js | 10 +- .../read/query/timegenerator/TimeGenerator.java | 3 + .../query/timegenerator/TsFileTimeGenerator.java | 6 + .../tsfile/read/reader/FakedTimeGenerator.java | 6 + 132 files changed, 2294 insertions(+), 935 deletions(-) diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java index 60d2192,0104bc4..10b6df8 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java @@@ -155,7 -155,7 +155,7 @@@ public class ClusterIoTDB implements Cl TProtocolFactory protocolFactory = ThriftServiceThread.getProtocolFactory( IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()); -- metaGroupMember = new MetaGroupMember(protocolFactory, thisNode, coordinator); ++ metaGroupMember = new MetaGroupMember(thisNode, coordinator); IoTDB.setClusterMode(); IoTDB.setMetaManager(CMManager.getInstance()); ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupMember); diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java index 0def4d0,0000000..c6729dd mode 100644,000000..100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java @@@ -1,148 -1,0 +1,154 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.log.sequencing; + +import org.apache.iotdb.cluster.log.Log; +import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest; +import org.apache.iotdb.cluster.log.VotingLog; +import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; +import org.apache.iotdb.cluster.log.manage.RaftLogManager; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; +import org.apache.iotdb.cluster.server.member.RaftMember; +import org.apache.iotdb.cluster.server.monitor.Timer; +import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class AsynchronousSequencer implements LogSequencer { + + private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class); - private static final ExecutorService SEQUENCER_POOL = - IoTDBThreadPoolFactory.newCachedThreadPool("SequencerPool"); + private static final int SEQUENCER_PARALLELISM = 4; + ++ private ExecutorService sequencerPool = ++ IoTDBThreadPoolFactory.newFixedThreadPool(SEQUENCER_PARALLELISM, "SequencerPool"); ++ + private RaftMember member; + private RaftLogManager logManager; + + private BlockingQueue<SendLogRequest> unsequencedLogQueue; + + public AsynchronousSequencer(RaftMember member, RaftLogManager logManager) { + this.member = member; + this.logManager = logManager; + unsequencedLogQueue = new ArrayBlockingQueue<>(4096); + for (int i = 0; i < SEQUENCER_PARALLELISM; i++) { - SEQUENCER_POOL.submit(this::sequenceTask); ++ sequencerPool.submit(this::sequenceTask); + } + } + + public SendLogRequest enqueueSendLogRequest(Log log) { + VotingLog votingLog = member.buildVotingLog(log); + AtomicBoolean leaderShipStale = new AtomicBoolean(false); + AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get()); + + SendLogRequest request = + new SendLogRequest( + votingLog, leaderShipStale, newLeaderTerm, null, member.getAllNodes().size() / 2); + try { + unsequencedLogQueue.put(request); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while putting {}", log); + } + return request; + } + + private void sequenceLogs(List<SendLogRequest> sendLogRequests) { + long startTime; + synchronized (logManager) { + for (SendLogRequest sendLogRequest : sendLogRequests) { + Log log = sendLogRequest.getVotingLog().getLog(); + log.setCurrLogTerm(member.getTerm().get()); + log.setCurrLogIndex(logManager.getLastLogIndex() + 1); + if (log instanceof PhysicalPlanLog) { + ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex()); + } + + startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime(); + // just like processPlanLocally,we need to check the size of log + + // logDispatcher will serialize log, and set log size, and we will use the size after it + logManager.append(log); + Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime); + + AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false); + sendLogRequest.setAppendEntryRequest(appendEntryRequest); + + startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime(); + log.setCreateTime(System.nanoTime()); + member.getVotingLogList().insert(sendLogRequest.getVotingLog()); + member.getLogDispatcher().offer(sendLogRequest); + Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime); + } + } + sendLogRequests.clear(); + } + + private void sequenceTask() { + List<SendLogRequest> sendLogRequests = new ArrayList<>(); + while (!Thread.interrupted()) { + try { + synchronized (unsequencedLogQueue) { + SendLogRequest request = unsequencedLogQueue.take(); + sendLogRequests.add(request); + unsequencedLogQueue.drainTo(sendLogRequests); + } + + sequenceLogs(sendLogRequests); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + @Override + public SendLogRequest sequence(Log log) { + return enqueueSendLogRequest(log); + } + + @Override + public void setLogManager(RaftLogManager logManager) { + this.logManager = logManager; + } + + public static class Factory implements LogSequencerFactory { + + @Override + public LogSequencer create(RaftMember member, RaftLogManager logManager) { + return new AsynchronousSequencer(member, logManager); + } + } ++ ++ @Override ++ public void close() { ++ sequencerPool.shutdownNow(); ++ } +} diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java index b418162,0000000..ded77c5 mode 100644,000000..100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencer.java @@@ -1,42 -1,0 +1,44 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.log.sequencing; + +import org.apache.iotdb.cluster.log.Log; +import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest; +import org.apache.iotdb.cluster.log.manage.RaftLogManager; + +/** + * LogSequencer assigns a unique index and associated term to a log entry and offers the entry to a + * LogDispatcher which will send the entry to followers. + */ +public interface LogSequencer { + + /** + * assigns a unique index and associated term to a log entry and offers the entry to a + * LogDispatcher which will send the entry to followers. + * + * @param log a log entry that is not yet indexed. + * @return A SendLogRequest through which the caller can monitor the status of the sending entry. + */ + SendLogRequest sequence(Log log); + + void setLogManager(RaftLogManager logManager); ++ ++ void close(); +} diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java index a749960,0000000..19b40a0 mode 100644,000000..100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java @@@ -1,115 -1,0 +1,120 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.log.sequencing; + +import org.apache.iotdb.cluster.log.Log; +import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest; +import org.apache.iotdb.cluster.log.VotingLog; +import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; +import org.apache.iotdb.cluster.log.manage.RaftLogManager; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; +import org.apache.iotdb.cluster.server.member.RaftMember; +import org.apache.iotdb.cluster.server.monitor.Timer; +import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller + * thread. + */ +public class SynchronousSequencer implements LogSequencer { + + private RaftMember member; + private RaftLogManager logManager; + + public SynchronousSequencer(RaftMember member, RaftLogManager logManager) { + this.member = member; + this.logManager = logManager; + } + + @Override + public SendLogRequest sequence(Log log) { + SendLogRequest sendLogRequest; + + long startTime = + Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime(); + synchronized (logManager) { + Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart( + startTime); + + log.setCurrLogTerm(member.getTerm().get()); + log.setCurrLogIndex(logManager.getLastLogIndex() + 1); + + if (log instanceof PhysicalPlanLog) { + PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log; + physicalPlanLog.getPlan().setIndex(log.getCurrLogIndex()); + } + + startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime(); + + // logDispatcher will serialize log, and set log size, and we will use the size after it + logManager.append(log); + Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime); + + startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime(); + sendLogRequest = buildSendLogRequest(log); + Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime); + + startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime(); + log.setCreateTime(System.nanoTime()); + member.getVotingLogList().insert(sendLogRequest.getVotingLog()); + member.getLogDispatcher().offer(sendLogRequest); + Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime); + } + return sendLogRequest; + } + + @Override + public void setLogManager(RaftLogManager logManager) { + this.logManager = logManager; + } + + private SendLogRequest buildSendLogRequest(Log log) { + VotingLog votingLog = member.buildVotingLog(log); + AtomicBoolean leaderShipStale = new AtomicBoolean(false); + AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get()); + + long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime(); + AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false); + Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime); + + return new SendLogRequest( + votingLog, + leaderShipStale, + newLeaderTerm, + appendEntryRequest, + member.getAllNodes().size() / 2); + } + + public static class Factory implements LogSequencerFactory { + + @Override + public LogSequencer create(RaftMember member, RaftLogManager logManager) { + return new SynchronousSequencer(member, logManager); + } + } ++ ++ @Override ++ public void close() { ++ ++ } +} diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 533adf8,36ee967..2a80781 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@@ -170,7 -170,7 +170,7 @@@ public class DataGroupMember extends Ra private LastAppliedPatitionTableVersion lastAppliedPartitionTableVersion; @TestOnly -- public DataGroupMember(PartitionGroup nodes) { ++ public DataGroupMember(Node thisNode, PartitionGroup nodes) { // constructor for test this.name = "Data-" @@@ -180,7 -180,7 +180,8 @@@ + "-raftId-" + nodes.getRaftId() + ""; - allNodes = nodes; ++ setThisNode(thisNode); + setAllNodes(nodes); mbeanName = String.format( "%s:%s=%s%d", @@@ -191,13 -191,9 +192,14 @@@ setQueryManager(new ClusterQueryManager()); localQueryExecutor = new LocalQueryExecutor(this); lastAppliedPartitionTableVersion = new LastAppliedPatitionTableVersion(getMemberDir()); + appenderFactory = + ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow() + ? new SlidingWindowLogAppender.Factory() + : new BlockingLogAppender.Factory(); ++ logSequencer = SEQUENCER_FACTORY.create(this, logManager); } - DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) { + DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember metaGroupMember) { // The name is used in JMX, so we have to avoid to use "(" "," "=" ")" super( "Data-" diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 4764895,f057131..fc2ddc0 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@@ -209,13 -210,13 +209,14 @@@ public class MetaGroupMember extends Ra @TestOnly public MetaGroupMember() {} -- public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator coordinator) { ++ public MetaGroupMember(Node thisNode, Coordinator coordinator) { super( "Meta", new ClientManager( ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(), ClientManager.Type.MetaGroupClient)); - allNodes = new PartitionGroup(); ++ setThisNode(thisNode); + setAllNodes(new PartitionGroup()); initPeerMap(); // committed logs are applied to the state machine (the IoTDB instance) through the applier @@@ -225,7 -225,7 +226,6 @@@ term.set(logManager.getHardState().getCurrentTerm()); voteFor = logManager.getHardState().getVoteFor(); -- setThisNode(thisNode); // load the identifier from the disk or generate a new one loadIdentifier(); allNodes.add(thisNode); diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 4d69153,d74579c..ea3b463 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@@ -356,6 -304,10 +356,13 @@@ public abstract class RaftMember implem public void stop() { setSkipElection(true); closeLogManager(); + if (clientManager != null) { + clientManager.close(); + } ++ if (logSequencer != null) { ++ logSequencer.close(); ++ } + if (heartBeatService == null) { return; } @@@ -772,20 -652,6 +779,24 @@@ public void setAllNodes(PartitionGroup allNodes) { this.allNodes = allNodes; ++ if (allNodes.isEmpty()) { ++ return; ++ } ++ + this.votingLogList = new VotingLogList(allNodes.size() / 2); + + // update the reference of thisNode to keep consistency + boolean foundThisNode = false; + for (Node node : allNodes) { + if (ClusterUtils.isNodeEquals(node, thisNode)) { + thisNode = node; + foundThisNode = true; + break; + } + } + if (!foundThisNode) { + logger.error("{}: did not find this node {}, in the raft group {}", name, thisNode, allNodes); + } } public Map<Node, Long> getLastCatchUpResponseTime() { @@@ -1661,29 -1516,18 +1672,28 @@@ * one follower tells the node that it is no longer a valid leader, or a timeout is triggered. */ @SuppressWarnings({"java:S2445"}) // safe synchronized - private AppendLogResult waitAppendResult( - AtomicInteger voteCounter, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm) { + protected AppendLogResult waitAppendResult( + VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) { // wait for the followers to vote long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime(); - synchronized (voteCounter) { + long nextTimeToPrint = 15000; + + int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size(); + int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size(); + int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum; + + synchronized (log) { long waitStart = System.currentTimeMillis(); long alreadyWait = 0; - while ( - log.getLog().getCurrLogIndex() == -1 || - stronglyAcceptedNodeNum < quorumSize - && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog())) - || (totalAccepted < quorumSize) - || votingLogList.size() > config.getMaxNumOfLogsInMem()) - while (voteCounter.get() > 0 -- && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS() - && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) { - && voteCounter.get() != Integer.MAX_VALUE) { ++ while (log.getLog().getCurrLogIndex() == -1 ++ || stronglyAcceptedNodeNum < quorumSize ++ && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog())) ++ || (totalAccepted < quorumSize) ++ || votingLogList.size() > config.getMaxNumOfLogsInMem()) ++ && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS() ++ && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) { try { - voteCounter.wait(ClusterConstant.getWriteOperationTimeoutMS()); + log.wait(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Unexpected interruption when sending a log", e); diff --cc cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java index 6056fff,6056fff..4df6a90 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java @@@ -31,14 -31,14 +31,14 @@@ import java.util.Collections public class TestDataGroupMember extends DataGroupMember { public TestDataGroupMember() { -- super(new PartitionGroup(Collections.singletonList(TestUtils.getNode(0)))); ++ super(TestUtils.getNode(0), ++ new PartitionGroup(Collections.singletonList(TestUtils.getNode(0)))); setQueryManager(new ClusterQueryManager()); this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, ""); } public TestDataGroupMember(Node thisNode, PartitionGroup allNodes) { -- super(allNodes); -- this.thisNode = thisNode; ++ super(thisNode, allNodes); this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, ""); setQueryManager(new ClusterQueryManager()); } diff --cc cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java index 8320ea7,761f5e5..c0bd588 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java @@@ -33,16 -35,5 +33,17 @@@ public class TestMetaGroupMember extend MetaSingleSnapshotLogManager manager = new MetaSingleSnapshotLogManager(new TestLogApplier(), this); setLogManager(manager); + + PartitionGroup group = new PartitionGroup(); + thisNode = TestUtils.getNode(0); + for (int i = 0; i < 10; i++) { + group.add(TestUtils.getNode(i)); + } + setAllNodes(group); + - this.clientManager = new ClientManager( - ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(), - ClientManager.Type.MetaGroupClient); ++ this.clientManager = ++ new ClientManager( ++ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(), ++ ClientManager.Type.MetaGroupClient); } } diff --cc cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java index 1ff27b5,31f553f..2f7b90c --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java @@@ -19,9 -19,6 +19,8 @@@ package org.apache.iotdb.cluster.log; - import org.apache.iotdb.cluster.client.ClientCategory; +import org.apache.iotdb.cluster.client.ClientManager; +import org.apache.iotdb.cluster.client.ClientManager.Type; import org.apache.iotdb.cluster.common.TestAsyncClient; import org.apache.iotdb.cluster.common.TestMetaGroupMember; import org.apache.iotdb.cluster.common.TestSyncClient; @@@ -110,38 -104,6 +109,41 @@@ public class LogDispatcherTest } @Override + public AsyncClient getAsyncClient(Node node) { + return new TestAsyncClient() { + + @Override - public void appendEntry(AppendEntryRequest request, - AsyncMethodCallback<AppendEntryResult> resultHandler) throws TException { ++ public void appendEntry( ++ AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) ++ throws TException { + try { + if (!downNode.contains(node)) { + resultHandler.onComplete(mockedAppendEntry(request)); + } + resultHandler.onComplete(new AppendEntryResult(-1)); + } catch (UnknownLogTypeException e) { + throw new TException(e); + } + } + + @Override - public void appendEntries(AppendEntriesRequest request, - AsyncMethodCallback<AppendEntryResult> resultHandler) throws TException { ++ public void appendEntries( ++ AppendEntriesRequest request, ++ AsyncMethodCallback<AppendEntryResult> resultHandler) ++ throws TException { + try { + if (!downNode.contains(node)) { + resultHandler.onComplete(mockedAppendEntries(request)); + } + resultHandler.onComplete(new AppendEntryResult(-1)); + } catch (UnknownLogTypeException e) { + throw new TException(e); + } + } + }; + } + + @Override public Client getSyncClient(Node node) { return new TestSyncClient() { @Override diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index b2f8e01,34c1458..7eb96a3 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@@ -94,7 -93,7 +94,6 @@@ import org.apache.iotdb.tsfile.write.sc import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.apache.thrift.async.AsyncMethodCallback; --import org.apache.thrift.protocol.TCompactProtocol.Factory; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 4e4f82f,736b269..99aa6d3 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@@ -348,9 -354,9 +351,9 @@@ public class MetaGroupMemberTest extend } @Override -- protected MetaGroupMember getMetaGroupMember(Node node) throws QueryProcessException { ++ protected MetaGroupMember getMetaGroupMember(Node node) { MetaGroupMember metaGroupMember = -- new MetaGroupMember(new Factory(), node, new Coordinator()) { ++ new MetaGroupMember(node, new Coordinator()) { @Override public void applyAddNode(AddNodeLog addNodeLog) { @@@ -809,7 -883,7 +812,7 @@@ System.out.println("Start testProcessNonQuery()"); mockDataClusterServer = true; -- MetaGroupMember testMetaMember2 = getMetaGroupMember(TestUtils.getNode(2)); ++ MetaGroupMember testMetaMember2 = getMetaGroupMember(TestUtils.getNode(20)); testMetaMember2.setCharacter(LEADER); // as a follower
