This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6114fd4cbe4 Fixed simple consensus correctness problem when recovering 
(#12173)
6114fd4cbe4 is described below

commit 6114fd4cbe47f7785a925400dccb9e5d92c34c0e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 15 17:00:31 2024 +0800

    Fixed simple consensus correctness problem when recovering (#12173)
    
    Previously, the simple consensus may write to the statemachine in 
parrallel, which may cause correctness problem and make the operations in log 
not be in the same order of execution. This PR fixed this problem by adding 
synchronized to Simple consensus operations.
---
 .../statemachine/ConfigRegionStateMachine.java          |  7 +++----
 .../confignode/manager/consensus/ConsensusManager.java  |  5 ++---
 .../consensus/simple/SimpleConsensusServerImpl.java     | 17 +++++++++--------
 .../schemaregion/impl/SchemaRegionMemoryImpl.java       |  5 ++---
 .../schemaregion/mtree/impl/mem/MemMTreeStore.java      |  1 +
 5 files changed, 17 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index c6fd310ecfc..e8793dcf8fb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -295,8 +295,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     return CommonDescriptor.getInstance().getConfig().isReadOnly();
   }
 
-  /** TODO optimize the lock usage. */
-  private synchronized void writeLogForSimpleConsensus(ConfigPhysicalPlan 
plan) {
+  private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
     if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
       try {
         simpleLogWriter.force();
@@ -409,8 +408,8 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     }
   }
 
-  private void createLogFile(int endIndex) {
-    simpleLogFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH + 
endIndex);
+  private void createLogFile(int startIndex) {
+    simpleLogFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH + 
startIndex);
     try {
       if (!simpleLogFile.createNewFile()) {
         LOGGER.warn(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 0e329fda460..3d78a858205 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -75,8 +75,7 @@ public class ConsensusManager {
   private final IManager configManager;
   private IConsensus consensusImpl;
 
-  public ConsensusManager(IManager configManager, ConfigRegionStateMachine 
stateMachine)
-      throws IOException {
+  public ConsensusManager(IManager configManager, ConfigRegionStateMachine 
stateMachine) {
     this.configManager = configManager;
     setConsensusLayer(stateMachine);
   }
@@ -107,7 +106,7 @@ public class ConsensusManager {
   }
 
   /** ConsensusLayer local implementation. */
-  private void setConsensusLayer(ConfigRegionStateMachine stateMachine) throws 
IOException {
+  private void setConsensusLayer(ConfigRegionStateMachine stateMachine) {
     if (SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
       upgrade();
       consensusImpl =
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
index 22b74b74538..fa3c9dff37b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
@@ -26,13 +26,12 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import java.io.File;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class SimpleConsensusServerImpl implements IStateMachine {
 
   private final Peer peer;
   private final IStateMachine stateMachine;
-  private final AtomicBoolean initialized = new AtomicBoolean(false);
+  private boolean initialized = false;
 
   public SimpleConsensusServerImpl(Peer peer, IStateMachine stateMachine) {
     this.peer = peer;
@@ -48,8 +47,9 @@ public class SimpleConsensusServerImpl implements 
IStateMachine {
   }
 
   @Override
-  public void start() {
-    if (initialized.compareAndSet(false, true)) {
+  public synchronized void start() {
+    if (!initialized) {
+      initialized = true;
       stateMachine.start();
       // Notify itself as the leader
       stateMachine.event().notifyLeaderChanged(peer.getGroupId(), 
peer.getNodeId());
@@ -58,7 +58,7 @@ public class SimpleConsensusServerImpl implements 
IStateMachine {
   }
 
   @Override
-  public void stop() {
+  public synchronized void stop() {
     stateMachine.stop();
   }
 
@@ -68,7 +68,7 @@ public class SimpleConsensusServerImpl implements 
IStateMachine {
   }
 
   @Override
-  public TSStatus write(IConsensusRequest request) {
+  public synchronized TSStatus write(IConsensusRequest request) {
     return stateMachine.write(request);
   }
 
@@ -77,18 +77,19 @@ public class SimpleConsensusServerImpl implements 
IStateMachine {
     return request;
   }
 
+  // The state machine below shall ensure the correctness of concurrent 
read-write.
   @Override
   public DataSet read(IConsensusRequest request) {
     return stateMachine.read(request);
   }
 
   @Override
-  public boolean takeSnapshot(File snapshotDir) {
+  public synchronized boolean takeSnapshot(File snapshotDir) {
     return stateMachine.takeSnapshot(snapshotDir);
   }
 
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {
+  public synchronized void loadSnapshot(File latestSnapshotRootDir) {
     stateMachine.loadSnapshot(latestSnapshotRootDir);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index ba102219a8e..6a00d430ad0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -254,7 +254,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
   }
 
   private void initMLog() throws IOException {
-    int lineNumber = initFromLog();
+    initFromLog();
 
     logWriter =
         new SchemaLogWriter<>(
@@ -776,8 +776,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
           plan.getViewPathToSourceExpressionMap();
       for (PartialPath path : pathList) {
         // create one logical view
-        IMeasurementMNode<IMemMNode> leafMNode =
-            mtree.createLogicalView(path, viewPathToSourceMap.get(path));
+        mtree.createLogicalView(path, viewPathToSourceMap.get(path));
       }
       // write log
       if (!isRecovering) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
index f0dddb2bf32..25481639613 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;

Reply via email to