This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6114fd4cbe4 Fixed simple consensus correctness problem when recovering
(#12173)
6114fd4cbe4 is described below
commit 6114fd4cbe47f7785a925400dccb9e5d92c34c0e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 15 17:00:31 2024 +0800
Fixed simple consensus correctness problem when recovering (#12173)
Previously, the simple consensus may write to the statemachine in
parrallel, which may cause correctness problem and make the operations in log
not be in the same order of execution. This PR fixed this problem by adding
synchronized to Simple consensus operations.
---
.../statemachine/ConfigRegionStateMachine.java | 7 +++----
.../confignode/manager/consensus/ConsensusManager.java | 5 ++---
.../consensus/simple/SimpleConsensusServerImpl.java | 17 +++++++++--------
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 5 ++---
.../schemaregion/mtree/impl/mem/MemMTreeStore.java | 1 +
5 files changed, 17 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index c6fd310ecfc..e8793dcf8fb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -295,8 +295,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}
- /** TODO optimize the lock usage. */
- private synchronized void writeLogForSimpleConsensus(ConfigPhysicalPlan
plan) {
+ private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
try {
simpleLogWriter.force();
@@ -409,8 +408,8 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
}
- private void createLogFile(int endIndex) {
- simpleLogFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH +
endIndex);
+ private void createLogFile(int startIndex) {
+ simpleLogFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH +
startIndex);
try {
if (!simpleLogFile.createNewFile()) {
LOGGER.warn(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 0e329fda460..3d78a858205 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -75,8 +75,7 @@ public class ConsensusManager {
private final IManager configManager;
private IConsensus consensusImpl;
- public ConsensusManager(IManager configManager, ConfigRegionStateMachine
stateMachine)
- throws IOException {
+ public ConsensusManager(IManager configManager, ConfigRegionStateMachine
stateMachine) {
this.configManager = configManager;
setConsensusLayer(stateMachine);
}
@@ -107,7 +106,7 @@ public class ConsensusManager {
}
/** ConsensusLayer local implementation. */
- private void setConsensusLayer(ConfigRegionStateMachine stateMachine) throws
IOException {
+ private void setConsensusLayer(ConfigRegionStateMachine stateMachine) {
if (SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
upgrade();
consensusImpl =
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
index 22b74b74538..fa3c9dff37b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
@@ -26,13 +26,12 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import java.io.File;
-import java.util.concurrent.atomic.AtomicBoolean;
public class SimpleConsensusServerImpl implements IStateMachine {
private final Peer peer;
private final IStateMachine stateMachine;
- private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private boolean initialized = false;
public SimpleConsensusServerImpl(Peer peer, IStateMachine stateMachine) {
this.peer = peer;
@@ -48,8 +47,9 @@ public class SimpleConsensusServerImpl implements
IStateMachine {
}
@Override
- public void start() {
- if (initialized.compareAndSet(false, true)) {
+ public synchronized void start() {
+ if (!initialized) {
+ initialized = true;
stateMachine.start();
// Notify itself as the leader
stateMachine.event().notifyLeaderChanged(peer.getGroupId(),
peer.getNodeId());
@@ -58,7 +58,7 @@ public class SimpleConsensusServerImpl implements
IStateMachine {
}
@Override
- public void stop() {
+ public synchronized void stop() {
stateMachine.stop();
}
@@ -68,7 +68,7 @@ public class SimpleConsensusServerImpl implements
IStateMachine {
}
@Override
- public TSStatus write(IConsensusRequest request) {
+ public synchronized TSStatus write(IConsensusRequest request) {
return stateMachine.write(request);
}
@@ -77,18 +77,19 @@ public class SimpleConsensusServerImpl implements
IStateMachine {
return request;
}
+ // The state machine below shall ensure the correctness of concurrent
read-write.
@Override
public DataSet read(IConsensusRequest request) {
return stateMachine.read(request);
}
@Override
- public boolean takeSnapshot(File snapshotDir) {
+ public synchronized boolean takeSnapshot(File snapshotDir) {
return stateMachine.takeSnapshot(snapshotDir);
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {
+ public synchronized void loadSnapshot(File latestSnapshotRootDir) {
stateMachine.loadSnapshot(latestSnapshotRootDir);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index ba102219a8e..6a00d430ad0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -254,7 +254,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
private void initMLog() throws IOException {
- int lineNumber = initFromLog();
+ initFromLog();
logWriter =
new SchemaLogWriter<>(
@@ -776,8 +776,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
plan.getViewPathToSourceExpressionMap();
for (PartialPath path : pathList) {
// create one logical view
- IMeasurementMNode<IMemMNode> leafMNode =
- mtree.createLogicalView(path, viewPathToSourceMap.get(path));
+ mtree.createLogicalView(path, viewPathToSourceMap.get(path));
}
// write log
if (!isRecovering) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
index f0dddb2bf32..25481639613 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MemMTreeStore.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem;
import org.apache.iotdb.commons.conf.CommonDescriptor;