This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch consensus_module_refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/consensus_module_refactor by
this push:
new 9ca59ee3b7d refactor data/schema region singleton
9ca59ee3b7d is described below
commit 9ca59ee3b7d4b7db5f982ea67778a6be1f531a57
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Wed Aug 16 20:06:03 2023 +0800
refactor data/schema region singleton
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../db/consensus/DataRegionConsensusImpl.java | 284 ++++++++++-----------
.../db/consensus/SchemaRegionConsensusImpl.java | 201 +++++++--------
2 files changed, 234 insertions(+), 251 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 1e9ec1722d4..03963c8b359 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -48,162 +48,150 @@ import java.util.concurrent.TimeUnit;
*/
public class DataRegionConsensusImpl {
- private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
-
- private static IConsensus INSTANCE = null;
-
private DataRegionConsensusImpl() {
// do nothing
}
- // need to create instance before calling this method
public static IConsensus getInstance() {
- return INSTANCE;
+ return DataRegionConsensusImplHolder.INSTANCE;
}
- public static synchronized IConsensus setupAndGetInstance() {
- if (INSTANCE == null) {
- INSTANCE =
- ConsensusFactory.getConsensusImpl(
- conf.getDataRegionConsensusProtocolClass(),
- ConsensusConfig.newBuilder()
- .setThisNodeId(conf.getDataNodeId())
- .setThisNode(
- new TEndPoint(
- conf.getInternalAddress(),
conf.getDataRegionConsensusPort()))
- .setStorageDir(conf.getDataRegionConsensusDir())
- .setConsensusGroupType(TConsensusGroupType.DataRegion)
- .setIoTConsensusConfig(
- IoTConsensusConfig.newBuilder()
- .setRpc(
- RPC.newBuilder()
-
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
-
.setRpcSelectorThreadNum(conf.getRpcSelectorThreadCount())
- .setRpcMinConcurrentClientNum(
- conf.getRpcMinConcurrentClientNum())
- .setRpcMaxConcurrentClientNum(
- conf.getRpcMaxConcurrentClientNum())
- .setRpcThriftCompressionEnabled(
- conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfClientManager(
- conf.getSelectorNumOfClientManager())
- .setThriftServerAwaitTimeForStopService(
-
conf.getThriftServerAwaitTimeForStopService())
-
.setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
- .setCoreClientNumForEachNode(
- conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build())
- .setReplication(
- IoTConsensusConfig.Replication.newBuilder()
-
.setWalThrottleThreshold(conf.getThrottleThreshold())
- .setAllocateMemoryForConsensus(
- conf.getAllocateMemoryForConsensus())
- .setMaxLogEntriesNumPerBatch(
- conf.getMaxLogEntriesNumPerBatch())
-
.setMaxSizePerBatch(conf.getMaxSizePerBatch())
-
.setMaxPendingBatchesNum(conf.getMaxPendingBatchesNum())
-
.setMaxMemoryRatioForQueue(conf.getMaxMemoryRatioForQueue())
- .build())
- .build())
- .setRatisConfig(
- RatisConfig.newBuilder()
- // An empty log is committed after each restart,
even if no data is
- // written. This setting ensures that compaction
work is not discarded
- // even if there are frequent restarts
- .setSnapshot(
- Snapshot.newBuilder()
- .setCreationGap(1)
- .setAutoTriggerThreshold(
-
conf.getDataRatisConsensusSnapshotTriggerThreshold())
- .build())
- .setLog(
- RatisConfig.Log.newBuilder()
- .setUnsafeFlushEnabled(
-
conf.isDataRatisConsensusLogUnsafeFlushEnable())
-
.setForceSyncNum(conf.getDataRatisConsensusLogForceSyncNum())
- .setSegmentSizeMax(
- SizeInBytes.valueOf(
-
conf.getDataRatisConsensusLogSegmentSizeMax()))
- .setPreserveNumsWhenPurge(
-
conf.getDataRatisConsensusPreserveWhenPurge())
- .build())
- .setGrpc(
- RatisConfig.Grpc.newBuilder()
- .setFlowControlWindow(
- SizeInBytes.valueOf(
-
conf.getDataRatisConsensusGrpcFlowControlWindow()))
- .setLeaderOutstandingAppendsMax(
- conf
-
.getDataRatisConsensusGrpcLeaderOutstandingAppendsMax())
- .build())
- .setRpc(
- RatisConfig.Rpc.newBuilder()
- .setTimeoutMin(
- TimeDuration.valueOf(
- conf
-
.getDataRatisConsensusLeaderElectionTimeoutMinMs(),
- TimeUnit.MILLISECONDS))
- .setTimeoutMax(
- TimeDuration.valueOf(
- conf
-
.getDataRatisConsensusLeaderElectionTimeoutMaxMs(),
- TimeUnit.MILLISECONDS))
- .setRequestTimeout(
- TimeDuration.valueOf(
-
conf.getDataRatisConsensusRequestTimeoutMs(),
- TimeUnit.MILLISECONDS))
- .setFirstElectionTimeoutMin(
- TimeDuration.valueOf(
-
conf.getRatisFirstElectionTimeoutMinMs(),
- TimeUnit.MILLISECONDS))
- .setFirstElectionTimeoutMax(
- TimeDuration.valueOf(
-
conf.getRatisFirstElectionTimeoutMaxMs(),
- TimeUnit.MILLISECONDS))
- .build())
- .setClient(
- RatisConfig.Client.newBuilder()
- .setClientRequestTimeoutMillis(
-
conf.getDataRatisConsensusRequestTimeoutMs())
- .setClientMaxRetryAttempt(
-
conf.getDataRatisConsensusMaxRetryAttempts())
- .setClientRetryInitialSleepTimeMs(
-
conf.getDataRatisConsensusInitialSleepTimeMs())
- .setClientRetryMaxSleepTimeMs(
-
conf.getDataRatisConsensusMaxSleepTimeMs())
- .setCoreClientNumForEachNode(
- conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build())
- .setImpl(
- RatisConfig.Impl.newBuilder()
-
.setTriggerSnapshotFileSize(conf.getDataRatisLogMax())
- .build())
- .setLeaderLogAppender(
- RatisConfig.LeaderLogAppender.newBuilder()
- .setBufferByteLimit(
-
conf.getDataRatisConsensusLogAppenderBufferSizeMax())
- .build())
- .build())
- .build(),
- DataRegionConsensusImpl::createDataRegionStateMachine)
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(
- ConsensusFactory.CONSTRUCT_FAILED_MSG,
- conf.getDataRegionConsensusProtocolClass())));
- }
- return INSTANCE;
- }
+ private static class DataRegionConsensusImplHolder {
+
+ private static final IoTDBConfig CONF =
IoTDBDescriptor.getInstance().getConfig();
+
+ private static final IConsensus INSTANCE =
+ ConsensusFactory.getConsensusImpl(
+ CONF.getDataRegionConsensusProtocolClass(),
+ ConsensusConfig.newBuilder()
+ .setThisNodeId(CONF.getDataNodeId())
+ .setThisNode(
+ new TEndPoint(CONF.getInternalAddress(),
CONF.getDataRegionConsensusPort()))
+ .setStorageDir(CONF.getDataRegionConsensusDir())
+ .setConsensusGroupType(TConsensusGroupType.DataRegion)
+ .setIoTConsensusConfig(
+ IoTConsensusConfig.newBuilder()
+ .setRpc(
+ RPC.newBuilder()
+
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
+
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
+ .setRpcMinConcurrentClientNum(
+ CONF.getRpcMinConcurrentClientNum())
+ .setRpcMaxConcurrentClientNum(
+ CONF.getRpcMaxConcurrentClientNum())
+ .setRpcThriftCompressionEnabled(
+ CONF.isRpcThriftCompressionEnable())
+ .setSelectorNumOfClientManager(
+ CONF.getSelectorNumOfClientManager())
+ .setThriftServerAwaitTimeForStopService(
+
CONF.getThriftServerAwaitTimeForStopService())
+
.setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
+
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+ .build())
+ .setReplication(
+ IoTConsensusConfig.Replication.newBuilder()
+
.setWalThrottleThreshold(CONF.getThrottleThreshold())
+ .setAllocateMemoryForConsensus(
+ CONF.getAllocateMemoryForConsensus())
+
.setMaxLogEntriesNumPerBatch(CONF.getMaxLogEntriesNumPerBatch())
+
.setMaxSizePerBatch(CONF.getMaxSizePerBatch())
+
.setMaxPendingBatchesNum(CONF.getMaxPendingBatchesNum())
+
.setMaxMemoryRatioForQueue(CONF.getMaxMemoryRatioForQueue())
+ .build())
+ .build())
+ .setRatisConfig(
+ RatisConfig.newBuilder()
+ // An empty log is committed after each restart,
even if no data is
+ // written. This setting ensures that compaction
work is not discarded
+ // even if there are frequent restarts
+ .setSnapshot(
+ Snapshot.newBuilder()
+ .setCreationGap(1)
+ .setAutoTriggerThreshold(
+
CONF.getDataRatisConsensusSnapshotTriggerThreshold())
+ .build())
+ .setLog(
+ RatisConfig.Log.newBuilder()
+ .setUnsafeFlushEnabled(
+
CONF.isDataRatisConsensusLogUnsafeFlushEnable())
+
.setForceSyncNum(CONF.getDataRatisConsensusLogForceSyncNum())
+ .setSegmentSizeMax(
+ SizeInBytes.valueOf(
+
CONF.getDataRatisConsensusLogSegmentSizeMax()))
+ .setPreserveNumsWhenPurge(
+
CONF.getDataRatisConsensusPreserveWhenPurge())
+ .build())
+ .setGrpc(
+ RatisConfig.Grpc.newBuilder()
+ .setFlowControlWindow(
+ SizeInBytes.valueOf(
+
CONF.getDataRatisConsensusGrpcFlowControlWindow()))
+ .setLeaderOutstandingAppendsMax(
+
CONF.getDataRatisConsensusGrpcLeaderOutstandingAppendsMax())
+ .build())
+ .setRpc(
+ RatisConfig.Rpc.newBuilder()
+ .setTimeoutMin(
+ TimeDuration.valueOf(
+
CONF.getDataRatisConsensusLeaderElectionTimeoutMinMs(),
+ TimeUnit.MILLISECONDS))
+ .setTimeoutMax(
+ TimeDuration.valueOf(
+
CONF.getDataRatisConsensusLeaderElectionTimeoutMaxMs(),
+ TimeUnit.MILLISECONDS))
+ .setRequestTimeout(
+ TimeDuration.valueOf(
+
CONF.getDataRatisConsensusRequestTimeoutMs(),
+ TimeUnit.MILLISECONDS))
+ .setFirstElectionTimeoutMin(
+ TimeDuration.valueOf(
+
CONF.getRatisFirstElectionTimeoutMinMs(),
+ TimeUnit.MILLISECONDS))
+ .setFirstElectionTimeoutMax(
+ TimeDuration.valueOf(
+
CONF.getRatisFirstElectionTimeoutMaxMs(),
+ TimeUnit.MILLISECONDS))
+ .build())
+ .setClient(
+ RatisConfig.Client.newBuilder()
+ .setClientRequestTimeoutMillis(
+
CONF.getDataRatisConsensusRequestTimeoutMs())
+ .setClientMaxRetryAttempt(
+
CONF.getDataRatisConsensusMaxRetryAttempts())
+ .setClientRetryInitialSleepTimeMs(
+
CONF.getDataRatisConsensusInitialSleepTimeMs())
+ .setClientRetryMaxSleepTimeMs(
+
CONF.getDataRatisConsensusMaxSleepTimeMs())
+
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+ .build())
+ .setImpl(
+ RatisConfig.Impl.newBuilder()
+
.setTriggerSnapshotFileSize(CONF.getDataRatisLogMax())
+ .build())
+ .setLeaderLogAppender(
+ RatisConfig.LeaderLogAppender.newBuilder()
+ .setBufferByteLimit(
+
CONF.getDataRatisConsensusLogAppenderBufferSizeMax())
+ .build())
+ .build())
+ .build(),
+ DataRegionConsensusImplHolder::createDataRegionStateMachine)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ CONF.getDataRegionConsensusProtocolClass())));
- private static DataRegionStateMachine
createDataRegionStateMachine(ConsensusGroupId gid) {
- DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
- if
(ConsensusFactory.IOT_CONSENSUS.equals(conf.getDataRegionConsensusProtocolClass()))
{
- return new IoTConsensusDataRegionStateMachine(dataRegion);
- } else {
- return new DataRegionStateMachine(dataRegion);
+ private static DataRegionStateMachine
createDataRegionStateMachine(ConsensusGroupId gid) {
+ DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
+ if
(ConsensusFactory.IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass()))
{
+ return new IoTConsensusDataRegionStateMachine(dataRegion);
+ } else {
+ return new DataRegionStateMachine(dataRegion);
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index a38b9dde4bc..0e39d28c66b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -42,112 +42,107 @@ import java.util.concurrent.TimeUnit;
*/
public class SchemaRegionConsensusImpl {
- private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
-
- private static IConsensus INSTANCE = null;
-
- private SchemaRegionConsensusImpl() {}
+ private SchemaRegionConsensusImpl() {
+ // do nothing
+ }
- // need to create instance before calling this method
public static IConsensus getInstance() {
- return INSTANCE;
+ return SchemaRegionConsensusImplHolder.INSTANCE;
}
- public static synchronized IConsensus setupAndGetInstance() {
- if (INSTANCE == null) {
- INSTANCE =
- ConsensusFactory.getConsensusImpl(
- conf.getSchemaRegionConsensusProtocolClass(),
- ConsensusConfig.newBuilder()
- .setThisNodeId(conf.getDataNodeId())
- .setThisNode(
- new TEndPoint(
- conf.getInternalAddress(),
conf.getSchemaRegionConsensusPort()))
- .setConsensusGroupType(TConsensusGroupType.SchemaRegion)
- .setRatisConfig(
- RatisConfig.newBuilder()
- .setSnapshot(
- RatisConfig.Snapshot.newBuilder()
- .setAutoTriggerThreshold(
-
conf.getSchemaRatisConsensusSnapshotTriggerThreshold())
- .build())
- .setLog(
- RatisConfig.Log.newBuilder()
- .setUnsafeFlushEnabled(
-
conf.isSchemaRatisConsensusLogUnsafeFlushEnable())
- .setSegmentSizeMax(
- SizeInBytes.valueOf(
-
conf.getSchemaRatisConsensusLogSegmentSizeMax()))
- .setPreserveNumsWhenPurge(
-
conf.getSchemaRatisConsensusPreserveWhenPurge())
- .build())
- .setGrpc(
- RatisConfig.Grpc.newBuilder()
- .setFlowControlWindow(
- SizeInBytes.valueOf(
-
conf.getSchemaRatisConsensusGrpcFlowControlWindow()))
- .build())
- .setRpc(
- RatisConfig.Rpc.newBuilder()
- .setTimeoutMin(
- TimeDuration.valueOf(
- conf
-
.getSchemaRatisConsensusLeaderElectionTimeoutMinMs(),
- TimeUnit.MILLISECONDS))
- .setTimeoutMax(
- TimeDuration.valueOf(
- conf
-
.getSchemaRatisConsensusLeaderElectionTimeoutMaxMs(),
- TimeUnit.MILLISECONDS))
- .setRequestTimeout(
- TimeDuration.valueOf(
-
conf.getSchemaRatisConsensusRequestTimeoutMs(),
- TimeUnit.MILLISECONDS))
- .setFirstElectionTimeoutMin(
- TimeDuration.valueOf(
-
conf.getRatisFirstElectionTimeoutMinMs(),
- TimeUnit.MILLISECONDS))
- .setFirstElectionTimeoutMax(
- TimeDuration.valueOf(
-
conf.getRatisFirstElectionTimeoutMaxMs(),
- TimeUnit.MILLISECONDS))
- .build())
- .setClient(
- RatisConfig.Client.newBuilder()
- .setClientRequestTimeoutMillis(
-
conf.getDataRatisConsensusRequestTimeoutMs())
- .setClientMaxRetryAttempt(
-
conf.getDataRatisConsensusMaxRetryAttempts())
- .setClientRetryInitialSleepTimeMs(
-
conf.getDataRatisConsensusInitialSleepTimeMs())
- .setClientRetryMaxSleepTimeMs(
-
conf.getDataRatisConsensusMaxSleepTimeMs())
- .setCoreClientNumForEachNode(
- conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build())
- .setImpl(
- RatisConfig.Impl.newBuilder()
-
.setTriggerSnapshotFileSize(conf.getSchemaRatisLogMax())
- .build())
- .setLeaderLogAppender(
- RatisConfig.LeaderLogAppender.newBuilder()
- .setBufferByteLimit(
-
conf.getSchemaRatisConsensusLogAppenderBufferSizeMax())
- .build())
- .build())
- .setStorageDir(conf.getSchemaRegionConsensusDir())
- .build(),
- gid ->
- new SchemaRegionStateMachine(
-
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(
- ConsensusFactory.CONSTRUCT_FAILED_MSG,
- conf.getSchemaRegionConsensusProtocolClass())));
- }
- return INSTANCE;
+ private static class SchemaRegionConsensusImplHolder {
+
+ private static final IoTDBConfig CONF =
IoTDBDescriptor.getInstance().getConfig();
+ private static final IConsensus INSTANCE =
+ ConsensusFactory.getConsensusImpl(
+ CONF.getSchemaRegionConsensusProtocolClass(),
+ ConsensusConfig.newBuilder()
+ .setThisNodeId(CONF.getDataNodeId())
+ .setThisNode(
+ new TEndPoint(
+ CONF.getInternalAddress(),
CONF.getSchemaRegionConsensusPort()))
+ .setConsensusGroupType(TConsensusGroupType.SchemaRegion)
+ .setRatisConfig(
+ RatisConfig.newBuilder()
+ .setSnapshot(
+ RatisConfig.Snapshot.newBuilder()
+ .setAutoTriggerThreshold(
+
CONF.getSchemaRatisConsensusSnapshotTriggerThreshold())
+ .build())
+ .setLog(
+ RatisConfig.Log.newBuilder()
+ .setUnsafeFlushEnabled(
+
CONF.isSchemaRatisConsensusLogUnsafeFlushEnable())
+ .setSegmentSizeMax(
+ SizeInBytes.valueOf(
+
CONF.getSchemaRatisConsensusLogSegmentSizeMax()))
+ .setPreserveNumsWhenPurge(
+
CONF.getSchemaRatisConsensusPreserveWhenPurge())
+ .build())
+ .setGrpc(
+ RatisConfig.Grpc.newBuilder()
+ .setFlowControlWindow(
+ SizeInBytes.valueOf(
+
CONF.getSchemaRatisConsensusGrpcFlowControlWindow()))
+ .build())
+ .setRpc(
+ RatisConfig.Rpc.newBuilder()
+ .setTimeoutMin(
+ TimeDuration.valueOf(
+ CONF
+
.getSchemaRatisConsensusLeaderElectionTimeoutMinMs(),
+ TimeUnit.MILLISECONDS))
+ .setTimeoutMax(
+ TimeDuration.valueOf(
+ CONF
+
.getSchemaRatisConsensusLeaderElectionTimeoutMaxMs(),
+ TimeUnit.MILLISECONDS))
+ .setRequestTimeout(
+ TimeDuration.valueOf(
+
CONF.getSchemaRatisConsensusRequestTimeoutMs(),
+ TimeUnit.MILLISECONDS))
+ .setFirstElectionTimeoutMin(
+ TimeDuration.valueOf(
+
CONF.getRatisFirstElectionTimeoutMinMs(),
+ TimeUnit.MILLISECONDS))
+ .setFirstElectionTimeoutMax(
+ TimeDuration.valueOf(
+
CONF.getRatisFirstElectionTimeoutMaxMs(),
+ TimeUnit.MILLISECONDS))
+ .build())
+ .setClient(
+ RatisConfig.Client.newBuilder()
+ .setClientRequestTimeoutMillis(
+
CONF.getDataRatisConsensusRequestTimeoutMs())
+ .setClientMaxRetryAttempt(
+
CONF.getDataRatisConsensusMaxRetryAttempts())
+ .setClientRetryInitialSleepTimeMs(
+
CONF.getDataRatisConsensusInitialSleepTimeMs())
+ .setClientRetryMaxSleepTimeMs(
+
CONF.getDataRatisConsensusMaxSleepTimeMs())
+
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+ .build())
+ .setImpl(
+ RatisConfig.Impl.newBuilder()
+
.setTriggerSnapshotFileSize(CONF.getSchemaRatisLogMax())
+ .build())
+ .setLeaderLogAppender(
+ RatisConfig.LeaderLogAppender.newBuilder()
+ .setBufferByteLimit(
+
CONF.getSchemaRatisConsensusLogAppenderBufferSizeMax())
+ .build())
+ .build())
+ .setStorageDir(CONF.getSchemaRegionConsensusDir())
+ .build(),
+ gid ->
+ new SchemaRegionStateMachine(
+
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ CONF.getSchemaRegionConsensusProtocolClass())));
}
}