This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/master in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 067bafa820b9bedf42ccdea6b32c3597dfa33e81 Author: Beyyes <[email protected]> AuthorDate: Mon Oct 31 14:31:23 2022 +0800 add init method for PartitionRegionStateMachine --- .../statemachine/PartitionRegionStateMachine.java | 48 +++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java index f6b940d548..f7c9a5b2ad 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java @@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor; import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader; +import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; @@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -120,6 +122,46 @@ public class PartitionRegionStateMachine result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); } + if (ConsensusFactory.StandAloneConsensus.equals(CONF.getConfigNodeConsensusProtocolClass())) { + if (logFile.length() > FILE_MAX_SIZE) { + try { + logWriter.force(); + } catch (IOException e) { + LOGGER.error("Can't force logWrite for ConfigNode Standalone mode", e); + } + for (int retry = 0; retry < 5; retry++) { + try { + logWriter.close(); + } catch (IOException e) { + LOGGER.warn( + "Can't close StandAloneLog for ConfigNode Standalone mode, filePath: {}, retry: {}", + logFile.getAbsolutePath(), + retry); + try { + // Sleep 1s and retry + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption during the close method of logWriter"); + } + continue; + } + break; + } + createLogFile(logFileId + 1); + } + + try { + ByteBuffer buffer = plan.serializeToByteBuffer(); + // The method logWriter.write will execute flip() firstly, so we must make position==limit + buffer.position(buffer.limit()); + logWriter.write(buffer); + } catch (IOException e) { + LOGGER.error( + "can't serialize current ConfigPhysicalPlan for ConfigNode Standalone mode", e); + } + } + return result; } @@ -212,7 +254,11 @@ public class PartitionRegionStateMachine } @Override - public void start() {} + public void start() { + if (ConsensusFactory.StandAloneConsensus.equals(CONF.getConfigNodeConsensusProtocolClass())) { + initStandAloneConfigNode(); + } + } @Override public void stop() {
