This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch consensus_module_refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/consensus_module_refactor by 
this push:
     new 9ca59ee3b7d refactor data/schema region singleton
9ca59ee3b7d is described below

commit 9ca59ee3b7d4b7db5f982ea67778a6be1f531a57
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Wed Aug 16 20:06:03 2023 +0800

    refactor data/schema region singleton
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../db/consensus/DataRegionConsensusImpl.java      | 284 ++++++++++-----------
 .../db/consensus/SchemaRegionConsensusImpl.java    | 201 +++++++--------
 2 files changed, 234 insertions(+), 251 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 1e9ec1722d4..03963c8b359 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -48,162 +48,150 @@ import java.util.concurrent.TimeUnit;
  */
 public class DataRegionConsensusImpl {
 
-  private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
-
-  private static IConsensus INSTANCE = null;
-
   private DataRegionConsensusImpl() {
     // do nothing
   }
 
-  // need to create instance before calling this method
   public static IConsensus getInstance() {
-    return INSTANCE;
+    return DataRegionConsensusImplHolder.INSTANCE;
   }
 
-  public static synchronized IConsensus setupAndGetInstance() {
-    if (INSTANCE == null) {
-      INSTANCE =
-          ConsensusFactory.getConsensusImpl(
-                  conf.getDataRegionConsensusProtocolClass(),
-                  ConsensusConfig.newBuilder()
-                      .setThisNodeId(conf.getDataNodeId())
-                      .setThisNode(
-                          new TEndPoint(
-                              conf.getInternalAddress(), 
conf.getDataRegionConsensusPort()))
-                      .setStorageDir(conf.getDataRegionConsensusDir())
-                      .setConsensusGroupType(TConsensusGroupType.DataRegion)
-                      .setIoTConsensusConfig(
-                          IoTConsensusConfig.newBuilder()
-                              .setRpc(
-                                  RPC.newBuilder()
-                                      
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
-                                      
.setRpcSelectorThreadNum(conf.getRpcSelectorThreadCount())
-                                      .setRpcMinConcurrentClientNum(
-                                          conf.getRpcMinConcurrentClientNum())
-                                      .setRpcMaxConcurrentClientNum(
-                                          conf.getRpcMaxConcurrentClientNum())
-                                      .setRpcThriftCompressionEnabled(
-                                          conf.isRpcThriftCompressionEnable())
-                                      .setSelectorNumOfClientManager(
-                                          conf.getSelectorNumOfClientManager())
-                                      .setThriftServerAwaitTimeForStopService(
-                                          
conf.getThriftServerAwaitTimeForStopService())
-                                      
.setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
-                                      .setCoreClientNumForEachNode(
-                                          conf.getCoreClientNumForEachNode())
-                                      
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                                      .build())
-                              .setReplication(
-                                  IoTConsensusConfig.Replication.newBuilder()
-                                      
.setWalThrottleThreshold(conf.getThrottleThreshold())
-                                      .setAllocateMemoryForConsensus(
-                                          conf.getAllocateMemoryForConsensus())
-                                      .setMaxLogEntriesNumPerBatch(
-                                          conf.getMaxLogEntriesNumPerBatch())
-                                      
.setMaxSizePerBatch(conf.getMaxSizePerBatch())
-                                      
.setMaxPendingBatchesNum(conf.getMaxPendingBatchesNum())
-                                      
.setMaxMemoryRatioForQueue(conf.getMaxMemoryRatioForQueue())
-                                      .build())
-                              .build())
-                      .setRatisConfig(
-                          RatisConfig.newBuilder()
-                              // An empty log is committed after each restart, 
even if no data is
-                              // written. This setting ensures that compaction 
work is not discarded
-                              // even if there are frequent restarts
-                              .setSnapshot(
-                                  Snapshot.newBuilder()
-                                      .setCreationGap(1)
-                                      .setAutoTriggerThreshold(
-                                          
conf.getDataRatisConsensusSnapshotTriggerThreshold())
-                                      .build())
-                              .setLog(
-                                  RatisConfig.Log.newBuilder()
-                                      .setUnsafeFlushEnabled(
-                                          
conf.isDataRatisConsensusLogUnsafeFlushEnable())
-                                      
.setForceSyncNum(conf.getDataRatisConsensusLogForceSyncNum())
-                                      .setSegmentSizeMax(
-                                          SizeInBytes.valueOf(
-                                              
conf.getDataRatisConsensusLogSegmentSizeMax()))
-                                      .setPreserveNumsWhenPurge(
-                                          
conf.getDataRatisConsensusPreserveWhenPurge())
-                                      .build())
-                              .setGrpc(
-                                  RatisConfig.Grpc.newBuilder()
-                                      .setFlowControlWindow(
-                                          SizeInBytes.valueOf(
-                                              
conf.getDataRatisConsensusGrpcFlowControlWindow()))
-                                      .setLeaderOutstandingAppendsMax(
-                                          conf
-                                              
.getDataRatisConsensusGrpcLeaderOutstandingAppendsMax())
-                                      .build())
-                              .setRpc(
-                                  RatisConfig.Rpc.newBuilder()
-                                      .setTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  
.getDataRatisConsensusLeaderElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  
.getDataRatisConsensusLeaderElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setRequestTimeout(
-                                          TimeDuration.valueOf(
-                                              
conf.getDataRatisConsensusRequestTimeoutMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              
conf.getRatisFirstElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              
conf.getRatisFirstElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .build())
-                              .setClient(
-                                  RatisConfig.Client.newBuilder()
-                                      .setClientRequestTimeoutMillis(
-                                          
conf.getDataRatisConsensusRequestTimeoutMs())
-                                      .setClientMaxRetryAttempt(
-                                          
conf.getDataRatisConsensusMaxRetryAttempts())
-                                      .setClientRetryInitialSleepTimeMs(
-                                          
conf.getDataRatisConsensusInitialSleepTimeMs())
-                                      .setClientRetryMaxSleepTimeMs(
-                                          
conf.getDataRatisConsensusMaxSleepTimeMs())
-                                      .setCoreClientNumForEachNode(
-                                          conf.getCoreClientNumForEachNode())
-                                      
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                                      .build())
-                              .setImpl(
-                                  RatisConfig.Impl.newBuilder()
-                                      
.setTriggerSnapshotFileSize(conf.getDataRatisLogMax())
-                                      .build())
-                              .setLeaderLogAppender(
-                                  RatisConfig.LeaderLogAppender.newBuilder()
-                                      .setBufferByteLimit(
-                                          
conf.getDataRatisConsensusLogAppenderBufferSizeMax())
-                                      .build())
-                              .build())
-                      .build(),
-                  DataRegionConsensusImpl::createDataRegionStateMachine)
-              .orElseThrow(
-                  () ->
-                      new IllegalArgumentException(
-                          String.format(
-                              ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                              conf.getDataRegionConsensusProtocolClass())));
-    }
-    return INSTANCE;
-  }
+  private static class DataRegionConsensusImplHolder {
+
+    private static final IoTDBConfig CONF = 
IoTDBDescriptor.getInstance().getConfig();
+
+    private static final IConsensus INSTANCE =
+        ConsensusFactory.getConsensusImpl(
+                CONF.getDataRegionConsensusProtocolClass(),
+                ConsensusConfig.newBuilder()
+                    .setThisNodeId(CONF.getDataNodeId())
+                    .setThisNode(
+                        new TEndPoint(CONF.getInternalAddress(), 
CONF.getDataRegionConsensusPort()))
+                    .setStorageDir(CONF.getDataRegionConsensusDir())
+                    .setConsensusGroupType(TConsensusGroupType.DataRegion)
+                    .setIoTConsensusConfig(
+                        IoTConsensusConfig.newBuilder()
+                            .setRpc(
+                                RPC.newBuilder()
+                                    
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
+                                    
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
+                                    .setRpcMinConcurrentClientNum(
+                                        CONF.getRpcMinConcurrentClientNum())
+                                    .setRpcMaxConcurrentClientNum(
+                                        CONF.getRpcMaxConcurrentClientNum())
+                                    .setRpcThriftCompressionEnabled(
+                                        CONF.isRpcThriftCompressionEnable())
+                                    .setSelectorNumOfClientManager(
+                                        CONF.getSelectorNumOfClientManager())
+                                    .setThriftServerAwaitTimeForStopService(
+                                        
CONF.getThriftServerAwaitTimeForStopService())
+                                    
.setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
+                                    
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+                                    
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+                                    .build())
+                            .setReplication(
+                                IoTConsensusConfig.Replication.newBuilder()
+                                    
.setWalThrottleThreshold(CONF.getThrottleThreshold())
+                                    .setAllocateMemoryForConsensus(
+                                        CONF.getAllocateMemoryForConsensus())
+                                    
.setMaxLogEntriesNumPerBatch(CONF.getMaxLogEntriesNumPerBatch())
+                                    
.setMaxSizePerBatch(CONF.getMaxSizePerBatch())
+                                    
.setMaxPendingBatchesNum(CONF.getMaxPendingBatchesNum())
+                                    
.setMaxMemoryRatioForQueue(CONF.getMaxMemoryRatioForQueue())
+                                    .build())
+                            .build())
+                    .setRatisConfig(
+                        RatisConfig.newBuilder()
+                            // An empty log is committed after each restart, 
even if no data is
+                            // written. This setting ensures that compaction 
work is not discarded
+                            // even if there are frequent restarts
+                            .setSnapshot(
+                                Snapshot.newBuilder()
+                                    .setCreationGap(1)
+                                    .setAutoTriggerThreshold(
+                                        
CONF.getDataRatisConsensusSnapshotTriggerThreshold())
+                                    .build())
+                            .setLog(
+                                RatisConfig.Log.newBuilder()
+                                    .setUnsafeFlushEnabled(
+                                        
CONF.isDataRatisConsensusLogUnsafeFlushEnable())
+                                    
.setForceSyncNum(CONF.getDataRatisConsensusLogForceSyncNum())
+                                    .setSegmentSizeMax(
+                                        SizeInBytes.valueOf(
+                                            
CONF.getDataRatisConsensusLogSegmentSizeMax()))
+                                    .setPreserveNumsWhenPurge(
+                                        
CONF.getDataRatisConsensusPreserveWhenPurge())
+                                    .build())
+                            .setGrpc(
+                                RatisConfig.Grpc.newBuilder()
+                                    .setFlowControlWindow(
+                                        SizeInBytes.valueOf(
+                                            
CONF.getDataRatisConsensusGrpcFlowControlWindow()))
+                                    .setLeaderOutstandingAppendsMax(
+                                        
CONF.getDataRatisConsensusGrpcLeaderOutstandingAppendsMax())
+                                    .build())
+                            .setRpc(
+                                RatisConfig.Rpc.newBuilder()
+                                    .setTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            
CONF.getDataRatisConsensusLeaderElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            
CONF.getDataRatisConsensusLeaderElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setRequestTimeout(
+                                        TimeDuration.valueOf(
+                                            
CONF.getDataRatisConsensusRequestTimeoutMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            
CONF.getRatisFirstElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            
CONF.getRatisFirstElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .build())
+                            .setClient(
+                                RatisConfig.Client.newBuilder()
+                                    .setClientRequestTimeoutMillis(
+                                        
CONF.getDataRatisConsensusRequestTimeoutMs())
+                                    .setClientMaxRetryAttempt(
+                                        
CONF.getDataRatisConsensusMaxRetryAttempts())
+                                    .setClientRetryInitialSleepTimeMs(
+                                        
CONF.getDataRatisConsensusInitialSleepTimeMs())
+                                    .setClientRetryMaxSleepTimeMs(
+                                        
CONF.getDataRatisConsensusMaxSleepTimeMs())
+                                    
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+                                    
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+                                    .build())
+                            .setImpl(
+                                RatisConfig.Impl.newBuilder()
+                                    
.setTriggerSnapshotFileSize(CONF.getDataRatisLogMax())
+                                    .build())
+                            .setLeaderLogAppender(
+                                RatisConfig.LeaderLogAppender.newBuilder()
+                                    .setBufferByteLimit(
+                                        
CONF.getDataRatisConsensusLogAppenderBufferSizeMax())
+                                    .build())
+                            .build())
+                    .build(),
+                DataRegionConsensusImplHolder::createDataRegionStateMachine)
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            CONF.getDataRegionConsensusProtocolClass())));
 
-  private static DataRegionStateMachine 
createDataRegionStateMachine(ConsensusGroupId gid) {
-    DataRegion dataRegion = 
StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
-    if 
(ConsensusFactory.IOT_CONSENSUS.equals(conf.getDataRegionConsensusProtocolClass()))
 {
-      return new IoTConsensusDataRegionStateMachine(dataRegion);
-    } else {
-      return new DataRegionStateMachine(dataRegion);
+    private static DataRegionStateMachine 
createDataRegionStateMachine(ConsensusGroupId gid) {
+      DataRegion dataRegion = 
StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
+      if 
(ConsensusFactory.IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass()))
 {
+        return new IoTConsensusDataRegionStateMachine(dataRegion);
+      } else {
+        return new DataRegionStateMachine(dataRegion);
+      }
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index a38b9dde4bc..0e39d28c66b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -42,112 +42,107 @@ import java.util.concurrent.TimeUnit;
  */
 public class SchemaRegionConsensusImpl {
 
-  private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
-
-  private static IConsensus INSTANCE = null;
-
-  private SchemaRegionConsensusImpl() {}
+  private SchemaRegionConsensusImpl() {
+    // do nothing
+  }
 
-  // need to create instance before calling this method
   public static IConsensus getInstance() {
-    return INSTANCE;
+    return SchemaRegionConsensusImplHolder.INSTANCE;
   }
 
-  public static synchronized IConsensus setupAndGetInstance() {
-    if (INSTANCE == null) {
-      INSTANCE =
-          ConsensusFactory.getConsensusImpl(
-                  conf.getSchemaRegionConsensusProtocolClass(),
-                  ConsensusConfig.newBuilder()
-                      .setThisNodeId(conf.getDataNodeId())
-                      .setThisNode(
-                          new TEndPoint(
-                              conf.getInternalAddress(), 
conf.getSchemaRegionConsensusPort()))
-                      .setConsensusGroupType(TConsensusGroupType.SchemaRegion)
-                      .setRatisConfig(
-                          RatisConfig.newBuilder()
-                              .setSnapshot(
-                                  RatisConfig.Snapshot.newBuilder()
-                                      .setAutoTriggerThreshold(
-                                          
conf.getSchemaRatisConsensusSnapshotTriggerThreshold())
-                                      .build())
-                              .setLog(
-                                  RatisConfig.Log.newBuilder()
-                                      .setUnsafeFlushEnabled(
-                                          
conf.isSchemaRatisConsensusLogUnsafeFlushEnable())
-                                      .setSegmentSizeMax(
-                                          SizeInBytes.valueOf(
-                                              
conf.getSchemaRatisConsensusLogSegmentSizeMax()))
-                                      .setPreserveNumsWhenPurge(
-                                          
conf.getSchemaRatisConsensusPreserveWhenPurge())
-                                      .build())
-                              .setGrpc(
-                                  RatisConfig.Grpc.newBuilder()
-                                      .setFlowControlWindow(
-                                          SizeInBytes.valueOf(
-                                              
conf.getSchemaRatisConsensusGrpcFlowControlWindow()))
-                                      .build())
-                              .setRpc(
-                                  RatisConfig.Rpc.newBuilder()
-                                      .setTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  
.getSchemaRatisConsensusLeaderElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  
.getSchemaRatisConsensusLeaderElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setRequestTimeout(
-                                          TimeDuration.valueOf(
-                                              
conf.getSchemaRatisConsensusRequestTimeoutMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              
conf.getRatisFirstElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              
conf.getRatisFirstElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .build())
-                              .setClient(
-                                  RatisConfig.Client.newBuilder()
-                                      .setClientRequestTimeoutMillis(
-                                          
conf.getDataRatisConsensusRequestTimeoutMs())
-                                      .setClientMaxRetryAttempt(
-                                          
conf.getDataRatisConsensusMaxRetryAttempts())
-                                      .setClientRetryInitialSleepTimeMs(
-                                          
conf.getDataRatisConsensusInitialSleepTimeMs())
-                                      .setClientRetryMaxSleepTimeMs(
-                                          
conf.getDataRatisConsensusMaxSleepTimeMs())
-                                      .setCoreClientNumForEachNode(
-                                          conf.getCoreClientNumForEachNode())
-                                      
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                                      .build())
-                              .setImpl(
-                                  RatisConfig.Impl.newBuilder()
-                                      
.setTriggerSnapshotFileSize(conf.getSchemaRatisLogMax())
-                                      .build())
-                              .setLeaderLogAppender(
-                                  RatisConfig.LeaderLogAppender.newBuilder()
-                                      .setBufferByteLimit(
-                                          
conf.getSchemaRatisConsensusLogAppenderBufferSizeMax())
-                                      .build())
-                              .build())
-                      .setStorageDir(conf.getSchemaRegionConsensusDir())
-                      .build(),
-                  gid ->
-                      new SchemaRegionStateMachine(
-                          
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
-              .orElseThrow(
-                  () ->
-                      new IllegalArgumentException(
-                          String.format(
-                              ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                              conf.getSchemaRegionConsensusProtocolClass())));
-    }
-    return INSTANCE;
+  private static class SchemaRegionConsensusImplHolder {
+
+    private static final IoTDBConfig CONF = 
IoTDBDescriptor.getInstance().getConfig();
+    private static final IConsensus INSTANCE =
+        ConsensusFactory.getConsensusImpl(
+                CONF.getSchemaRegionConsensusProtocolClass(),
+                ConsensusConfig.newBuilder()
+                    .setThisNodeId(CONF.getDataNodeId())
+                    .setThisNode(
+                        new TEndPoint(
+                            CONF.getInternalAddress(), 
CONF.getSchemaRegionConsensusPort()))
+                    .setConsensusGroupType(TConsensusGroupType.SchemaRegion)
+                    .setRatisConfig(
+                        RatisConfig.newBuilder()
+                            .setSnapshot(
+                                RatisConfig.Snapshot.newBuilder()
+                                    .setAutoTriggerThreshold(
+                                        
CONF.getSchemaRatisConsensusSnapshotTriggerThreshold())
+                                    .build())
+                            .setLog(
+                                RatisConfig.Log.newBuilder()
+                                    .setUnsafeFlushEnabled(
+                                        
CONF.isSchemaRatisConsensusLogUnsafeFlushEnable())
+                                    .setSegmentSizeMax(
+                                        SizeInBytes.valueOf(
+                                            
CONF.getSchemaRatisConsensusLogSegmentSizeMax()))
+                                    .setPreserveNumsWhenPurge(
+                                        
CONF.getSchemaRatisConsensusPreserveWhenPurge())
+                                    .build())
+                            .setGrpc(
+                                RatisConfig.Grpc.newBuilder()
+                                    .setFlowControlWindow(
+                                        SizeInBytes.valueOf(
+                                            
CONF.getSchemaRatisConsensusGrpcFlowControlWindow()))
+                                    .build())
+                            .setRpc(
+                                RatisConfig.Rpc.newBuilder()
+                                    .setTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            CONF
+                                                
.getSchemaRatisConsensusLeaderElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            CONF
+                                                
.getSchemaRatisConsensusLeaderElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setRequestTimeout(
+                                        TimeDuration.valueOf(
+                                            
CONF.getSchemaRatisConsensusRequestTimeoutMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            
CONF.getRatisFirstElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            
CONF.getRatisFirstElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .build())
+                            .setClient(
+                                RatisConfig.Client.newBuilder()
+                                    .setClientRequestTimeoutMillis(
+                                        
CONF.getDataRatisConsensusRequestTimeoutMs())
+                                    .setClientMaxRetryAttempt(
+                                        
CONF.getDataRatisConsensusMaxRetryAttempts())
+                                    .setClientRetryInitialSleepTimeMs(
+                                        
CONF.getDataRatisConsensusInitialSleepTimeMs())
+                                    .setClientRetryMaxSleepTimeMs(
+                                        
CONF.getDataRatisConsensusMaxSleepTimeMs())
+                                    
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+                                    
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+                                    .build())
+                            .setImpl(
+                                RatisConfig.Impl.newBuilder()
+                                    
.setTriggerSnapshotFileSize(CONF.getSchemaRatisLogMax())
+                                    .build())
+                            .setLeaderLogAppender(
+                                RatisConfig.LeaderLogAppender.newBuilder()
+                                    .setBufferByteLimit(
+                                        
CONF.getSchemaRatisConsensusLogAppenderBufferSizeMax())
+                                    .build())
+                            .build())
+                    .setStorageDir(CONF.getSchemaRegionConsensusDir())
+                    .build(),
+                gid ->
+                    new SchemaRegionStateMachine(
+                        
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            CONF.getSchemaRegionConsensusProtocolClass())));
   }
 }

Reply via email to