This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_catch_up by this push:
new 34879e6bef add state machine integration
34879e6bef is described below
commit 34879e6bef9b986e7fe3f7e24417a14bf43c209d
Author: jt <[email protected]>
AuthorDate: Wed Jun 15 11:26:31 2022 +0800
add state machine integration
---
.../iotdb/cluster/coordinator/Coordinator.java | 19 +-
.../iotdb/cluster/log/applier/BaseApplier.java | 210 +--------------------
.../iotdb/cluster/log/applier/DataLogApplier.java | 99 ++--------
.../iotdb/cluster/log/applier/MetaLogApplier.java | 22 ++-
.../manage/FilePartitionedSnapshotLogManager.java | 20 +-
.../log/manage/MetaSingleSnapshotLogManager.java | 14 +-
.../log/manage/PartitionedSnapshotLogManager.java | 6 +-
.../iotdb/cluster/log/manage/RaftLogManager.java | 13 +-
.../cluster/server/member/DataGroupMember.java | 29 ++-
.../cluster/server/member/MetaGroupMember.java | 12 +-
.../iotdb/cluster/server/member/RaftMember.java | 20 +-
.../cluster/common/TestPartitionedLogManager.java | 4 +-
.../cluster/log/applier/DataLogApplierTest.java | 2 +-
13 files changed, 101 insertions(+), 369 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index c03b1c0a80..83824e9561 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -148,29 +148,12 @@ public class Coordinator {
/** execute a non-query plan that is not necessary to be executed on other
nodes. */
private TSStatus executeNonQueryLocally(IConsensusRequest request) {
- boolean execRet;
try {
- if (request instanceof PhysicalPlan) {
- execRet =
metaGroupMember.getLocalExecutor().processNonQuery(((PhysicalPlan) request));
- } else {
- return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "Unsupported request: " + request);
- }
- } catch (QueryProcessException e) {
- if (e.getErrorCode() !=
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
- logger.debug("meet error while processing non-query. ", e);
- } else {
- logger.warn("meet error while processing non-query. ", e);
- }
- return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ return metaGroupMember.getStateMachine().write(request);
} catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
}
-
- return execRet
- ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully")
- : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
/**
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 34767fa6e9..876be2bfde 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
@@ -38,7 +39,6 @@ import
org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import
org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -54,212 +54,14 @@ import java.util.Collections;
/** BaseApplier use PlanExecutor to execute PhysicalPlans. */
abstract class BaseApplier implements LogApplier {
- private static final Logger logger =
LoggerFactory.getLogger(BaseApplier.class);
+ IStateMachine stateMachine;
- MetaGroupMember metaGroupMember;
- private PlanExecutor queryExecutor;
-
- BaseApplier(MetaGroupMember metaGroupMember) {
- this.metaGroupMember = metaGroupMember;
- }
-
- /**
- * @param request
- * @param dataGroupMember the data group member that is applying the log,
null if the log is
- * applied by a meta group member
- * @throws QueryProcessException
- * @throws StorageGroupNotSetException
- * @throws StorageEngineException
- */
- void applyRequest(IConsensusRequest request, DataGroupMember dataGroupMember)
- throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
- if (request instanceof InsertPlan) {
- processPlanWithTolerance((InsertPlan) request, dataGroupMember);
- } else if (request instanceof PhysicalPlan && !((PhysicalPlan)
request).isQuery()) {
- PhysicalPlan plan = ((PhysicalPlan) request);
- try {
- getQueryExecutor().processNonQuery(((PhysicalPlan) request));
- } catch (BatchProcessException e) {
- handleBatchProcessException(e, plan);
- } catch (QueryProcessException e) {
- if (e.getCause() instanceof StorageGroupNotSetException
- || e.getCause() instanceof UndefinedTemplateException) {
- executeAfterSync(plan);
- } else {
- throw e;
- }
- } catch (StorageGroupNotSetException e) {
- executeAfterSync(plan);
- }
- } else if (request != null) {
- throw new QueryProcessException("Unsupported request: " + request);
- }
- }
-
- private void handleBatchProcessException(
- BatchProcessException e, InsertPlan plan, DataGroupMember
dataGroupMember)
- throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
- TSStatus[] failingStatus = e.getFailingStatus();
- for (int i = 0; i < failingStatus.length; i++) {
- TSStatus status = failingStatus[i];
- // skip succeeded plans in later execution
- if (status != null
- && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && plan instanceof BatchPlan) {
- ((BatchPlan) plan).setIsExecuted(i);
- }
- }
-
- boolean needRetry = false, hasError = false;
- for (int i = 0, failingStatusLength = failingStatus.length; i <
failingStatusLength; i++) {
- TSStatus status = failingStatus[i];
- if (status != null) {
- if (status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
- && plan instanceof BatchPlan) {
- ((BatchPlan) plan).unsetIsExecuted(i);
- needRetry = true;
- } else if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- hasError = true;
- }
- }
- }
- if (hasError) {
- throw e;
- }
- if (needRetry) {
- pullTimeseriesSchema(plan, dataGroupMember.getHeader());
- plan.recoverFromFailure();
- getQueryExecutor().processNonQuery(plan);
- }
- } else {
- throw e;
- }
- }
-
- private void handleBatchProcessException(BatchProcessException e,
PhysicalPlan plan)
- throws QueryProcessException, StorageEngineException,
StorageGroupNotSetException {
- TSStatus[] failingStatus = e.getFailingStatus();
- boolean needThrow = false;
- for (int i = 0; i < failingStatus.length; i++) {
- TSStatus status = failingStatus[i];
- // skip succeeded plans in later execution
- if (status != null
- && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && plan instanceof BatchPlan) {
- ((BatchPlan) plan).setIsExecuted(i);
- }
-
- if (plan instanceof DeleteTimeSeriesPlan) {
- if (status != null && status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- if (status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
- logger.info("{} doesn't exist, it may has been deleted.",
plan.getPaths().get(i));
- } else {
- needThrow = true;
- }
- }
- }
- }
- boolean needRetry = false;
- for (int i = 0, failingStatusLength = failingStatus.length; i <
failingStatusLength; i++) {
- TSStatus status = failingStatus[i];
- if (status != null
- && (status.getCode() ==
TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode()
- || status.getCode() ==
TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode())
- && plan instanceof BatchPlan) {
- ((BatchPlan) plan).unsetIsExecuted(i);
- needRetry = true;
- }
- }
- if (needRetry) {
- executeAfterSync(plan);
- return;
- }
-
- if (!(plan instanceof DeleteTimeSeriesPlan) || needThrow) {
- throw e;
- }
- }
-
- private void executeAfterSync(PhysicalPlan plan)
- throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(true);
- } catch (CheckConsistencyException ce) {
- throw new QueryProcessException(ce.getMessage());
- }
- getQueryExecutor().processNonQuery(plan);
- }
-
- /**
- * @param plan
- * @param dataGroupMember the data group member that is applying the log,
null if the log is
- * applied by a meta group member
- * @throws QueryProcessException
- * @throws StorageGroupNotSetException
- * @throws StorageEngineException
- */
- private void processPlanWithTolerance(InsertPlan plan, DataGroupMember
dataGroupMember)
- throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
- try {
- getQueryExecutor().processNonQuery(plan);
- } catch (BatchProcessException e) {
- handleBatchProcessException(e, plan, dataGroupMember);
- } catch (QueryProcessException | StorageGroupNotSetException |
StorageEngineException e) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
- // check if this is caused by metadata missing, if so, pull metadata
and retry
- Throwable metaMissingException =
SchemaUtils.findMetaMissingException(e);
- boolean causedByPathNotExist = metaMissingException instanceof
PathNotExistException;
-
- if (causedByPathNotExist) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Timeseries is not found locally[{}], try pulling it from
another group: {}",
- metaGroupMember.getName(),
- e.getCause().getMessage());
- }
- pullTimeseriesSchema(plan, dataGroupMember.getHeader());
- plan.recoverFromFailure();
- getQueryExecutor().processNonQuery(plan);
- } else {
- throw e;
- }
- } else {
- throw e;
- }
- }
- }
-
- /**
- * @param plan
- * @param ignoredGroup do not pull schema from the group to avoid backward
dependency
- * @throws QueryProcessException
- */
- private void pullTimeseriesSchema(InsertPlan plan, RaftNode ignoredGroup)
- throws QueryProcessException {
- try {
- if (plan instanceof BatchPlan) {
- MetaPuller.getInstance()
- .pullTimeSeriesSchemas(((BatchPlan) plan).getPrefixPaths(),
ignoredGroup);
- } else {
- PartialPath path = plan.getDevicePath();
- MetaPuller.getInstance()
- .pullTimeSeriesSchemas(Collections.singletonList(path),
ignoredGroup);
- }
- } catch (MetadataException e1) {
- throw new QueryProcessException(e1);
- }
- }
-
- private PlanExecutor getQueryExecutor() throws QueryProcessException {
- if (queryExecutor == null) {
- queryExecutor = new ClusterPlanExecutor(metaGroupMember);
- }
- return queryExecutor;
+ BaseApplier(IStateMachine stateMachine) {
+ this.stateMachine = stateMachine;
}
@TestOnly
- public void setQueryExecutor(PlanExecutor queryExecutor) {
- this.queryExecutor = queryExecutor;
+ public void setStateMachine(IStateMachine stateMachine) {
+ this.stateMachine = stateMachine;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 95d1f126c1..266e61c332 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -20,32 +20,23 @@
package org.apache.iotdb.cluster.log.applier;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.IOUtils;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.service.IoTDB;
-
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,8 +49,9 @@ public class DataLogApplier extends BaseApplier {
protected DataGroupMember dataGroupMember;
- public DataLogApplier(MetaGroupMember metaGroupMember, DataGroupMember
dataGroupMember) {
- super(metaGroupMember);
+ public DataLogApplier(DataGroupMember dataGroupMember,
+ IStateMachine stateMachine) {
+ super(stateMachine);
this.dataGroupMember = dataGroupMember;
}
@@ -83,7 +75,10 @@ public class DataLogApplier extends BaseApplier {
} else if (log instanceof RequestLog) {
RequestLog requestLog = (RequestLog) log;
IConsensusRequest request = requestLog.getRequest();
- applyRequest(request);
+ TSStatus status = applyRequest(request);
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ log.setException(new QueryProcessException(status.message,
status.code));
+ }
} else if (log instanceof CloseFileLog) {
CloseFileLog closeFileLog = ((CloseFileLog) log);
StorageEngine.getInstance()
@@ -106,81 +101,13 @@ public class DataLogApplier extends BaseApplier {
}
}
- public void applyRequest(IConsensusRequest request)
- throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
+ public TSStatus applyRequest(IConsensusRequest request) {
if (request instanceof DeletePlan) {
((DeletePlan)
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
} else if (request instanceof DeleteTimeSeriesPlan) {
((DeleteTimeSeriesPlan)
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
}
- if (request instanceof InsertMultiTabletsPlan) {
- applyInsert((InsertMultiTabletsPlan) request);
- } else if (request instanceof InsertRowsPlan) {
- applyInsert((InsertRowsPlan) request);
- } else if (request instanceof InsertPlan) {
- applyInsert((InsertPlan) request);
- } else {
- applyRequest(request, dataGroupMember);
- }
- }
- private void applyInsert(InsertMultiTabletsPlan plan)
- throws StorageGroupNotSetException, QueryProcessException,
StorageEngineException {
- boolean hasSync = false;
- for (InsertTabletPlan insertTabletPlan : plan.getInsertTabletPlanList()) {
- try {
-
IoTDB.schemaProcessor.getBelongedStorageGroup(insertTabletPlan.getDevicePath());
- } catch (StorageGroupNotSetException e) {
- try {
- if (!hasSync) {
- metaGroupMember.syncLeaderWithConsistencyCheck(true);
- hasSync = true;
- } else {
- throw new StorageEngineException(e.getMessage());
- }
- } catch (CheckConsistencyException ce) {
- throw new QueryProcessException(ce.getMessage());
- }
- }
- }
- applyRequest(plan, dataGroupMember);
- }
-
- private void applyInsert(InsertRowsPlan plan)
- throws StorageGroupNotSetException, QueryProcessException,
StorageEngineException {
- boolean hasSync = false;
- for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) {
- try {
-
IoTDB.schemaProcessor.getBelongedStorageGroup(insertRowPlan.getDevicePath());
- } catch (StorageGroupNotSetException e) {
- try {
- if (!hasSync) {
- metaGroupMember.syncLeaderWithConsistencyCheck(true);
- hasSync = true;
- } else {
- throw new StorageEngineException(e.getMessage());
- }
- } catch (CheckConsistencyException ce) {
- throw new QueryProcessException(ce.getMessage());
- }
- }
- }
- applyRequest(plan, dataGroupMember);
- }
-
- private void applyInsert(InsertPlan plan)
- throws StorageGroupNotSetException, QueryProcessException,
StorageEngineException {
- try {
- IoTDB.schemaProcessor.getBelongedStorageGroup(plan.getDevicePath());
- } catch (StorageGroupNotSetException e) {
- // the sg may not exist because the node does not catch up with the
leader, retry after
- // synchronization
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(true);
- } catch (CheckConsistencyException ce) {
- throw new QueryProcessException(ce.getMessage());
- }
- }
- applyRequest(plan, dataGroupMember);
+ return stateMachine.write(request);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index c75c15a914..1259c0df84 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.db.mpp.execution.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,19 +40,19 @@ public class MetaLogApplier extends BaseApplier {
private static final Logger logger =
LoggerFactory.getLogger(MetaLogApplier.class);
private MetaGroupMember member;
- public MetaLogApplier(MetaGroupMember member) {
- super(member);
+ public MetaLogApplier(MetaGroupMember member, IStateMachine stateMachine) {
+ super(stateMachine);
this.member = member;
}
@Override
public void apply(Log log) {
try {
- logger.debug("MetaMember [{}] starts applying Log {}",
metaGroupMember.getName(), log);
+ logger.debug("MetaMember [{}] starts applying Log {}", member.getName(),
log);
if (log instanceof AddNodeLog) {
applyAddNodeLog((AddNodeLog) log);
} else if (log instanceof RequestLog) {
- applyRequest(((RequestLog) log).getRequest(), null);
+ stateMachine.write(((RequestLog) log).getRequest());
} else if (log instanceof RemoveNodeLog) {
applyRemoveNodeLog((RemoveNodeLog) log);
} else if (log instanceof EmptyContentLog || log instanceof
FragmentedLog) {
@@ -67,24 +69,24 @@ public class MetaLogApplier extends BaseApplier {
}
private void applyAddNodeLog(AddNodeLog log) throws
ChangeMembershipException {
- if
(!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+ if (!member.getPartitionTable().deserialize(log.getPartitionTable())) {
logger.info("Ignore previous change membership log");
// ignore previous change membership log
return;
}
- if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
- metaGroupMember.getCoordinator().sendLogToAllDataGroups(log);
+ if (member.getCharacter() == NodeCharacter.LEADER) {
+ member.getCoordinator().sendLogToAllDataGroups(log);
}
member.applyAddNode(log);
}
private void applyRemoveNodeLog(RemoveNodeLog log) throws
ChangeMembershipException {
- if
(!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+ if (!member.getPartitionTable().deserialize(log.getPartitionTable())) {
// ignore previous change membership log
return;
}
- if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
- metaGroupMember.getCoordinator().sendLogToAllDataGroups(log);
+ if (member.getCharacter() == NodeCharacter.LEADER) {
+ member.getCoordinator().sendLogToAllDataGroups(log);
}
member.applyRemoveNode(log);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 3f25525ad6..656def1b65 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -20,9 +20,12 @@
package org.apache.iotdb.cluster.log.manage;
import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.EntryCompactedException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.applier.AsyncDataLogApplier;
+import org.apache.iotdb.cluster.log.applier.DataLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
@@ -32,8 +35,10 @@ import
org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.execution.StateMachine;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
@@ -62,12 +67,23 @@ public class FilePartitionedSnapshotLogManager extends
PartitionedSnapshotLogMan
LoggerFactory.getLogger(FilePartitionedSnapshotLogManager.class);
public FilePartitionedSnapshotLogManager(
- LogApplier logApplier,
+ IStateMachine stateMachine,
PartitionTable partitionTable,
Node header,
Node thisNode,
DataGroupMember dataGroupMember) {
- super(logApplier, partitionTable, header, thisNode, Factory.INSTANCE,
dataGroupMember);
+ super(createLogApplier(dataGroupMember, stateMachine), partitionTable,
header, thisNode, Factory.INSTANCE,
+ dataGroupMember, stateMachine);
+ }
+
+ private static LogApplier createLogApplier(
+ DataGroupMember dataGroupMember, IStateMachine stateMachine) {
+ LogApplier applier = new DataLogApplier(dataGroupMember, stateMachine);
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
+ && ClusterDescriptor.getInstance().getConfig().getReplicationNum() !=
1) {
+ applier = new AsyncDataLogApplier(applier, dataGroupMember.getName());
+ }
+ return applier;
}
/** send FlushPlan to all nodes in one dataGroup */
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index d9bc9e8f9f..32c7d19ce5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.cluster.log.manage;
-import org.apache.iotdb.cluster.log.LogApplier;
+import java.io.IOException;
+import java.util.Map;
import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -30,16 +32,13 @@ import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
import org.apache.iotdb.commons.auth.entity.Role;
import org.apache.iotdb.commons.auth.entity.User;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
import org.apache.iotdb.db.service.IoTDB;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-
/** MetaSingleSnapshotLogManager provides a MetaSimpleSnapshot as snapshot. */
public class MetaSingleSnapshotLogManager extends RaftLogManager {
@@ -52,8 +51,9 @@ public class MetaSingleSnapshotLogManager extends
RaftLogManager {
private long commitIndex;
private long term;
- public MetaSingleSnapshotLogManager(LogApplier logApplier, MetaGroupMember
metaGroupMember) {
- super(new SyncLogDequeSerializer(0), logApplier,
metaGroupMember.getName());
+ public MetaSingleSnapshotLogManager(IStateMachine stateMachine,
MetaGroupMember metaGroupMember) {
+ super(new SyncLogDequeSerializer(0), new MetaLogApplier(metaGroupMember,
stateMachine),
+ metaGroupMember.getName(), stateMachine);
this.metaGroupMember = metaGroupMember;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 88713aacc5..8e2c7f46c0 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
@@ -73,11 +74,12 @@ public abstract class PartitionedSnapshotLogManager<T
extends Snapshot> extends
Node header,
Node thisNode,
SnapshotFactory<T> factory,
- DataGroupMember dataGroupMember) {
+ DataGroupMember dataGroupMember, IStateMachine stateMachine) {
super(
new SyncLogDequeSerializer(header.nodeIdentifier),
logApplier,
- Integer.toString(header.getNodeIdentifier()));
+ Integer.toString(header.getNodeIdentifier()),
+ stateMachine);
this.partitionTable = partitionTable;
this.factory = factory;
this.thisNode = thisNode;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index d65e060da9..ec2dbcbd6e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -31,11 +31,13 @@ import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.StableEntryManager;
import org.apache.iotdb.cluster.log.manage.serializable.LogManagerMeta;
+import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,9 +113,14 @@ public abstract class RaftLogManager {
protected List<Log> blockedUnappliedLogList;
- protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier
applier, String name) {
+ protected IStateMachine stateMachine;
+
+ protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier
applier, String name
+ , IStateMachine stateMachine) {
this.logApplier = applier;
this.name = name;
+ this.stateMachine = stateMachine;
+
LogManagerMeta meta = stableEntryManager.getMeta();
this.setCommittedEntryManager(new CommittedEntryManager(maxNumOfLogsInMem,
meta));
this.setStableEntryManager(stableEntryManager);
@@ -1071,8 +1078,4 @@ public abstract class RaftLogManager {
public long getBlockAppliedCommitIndex() {
return blockAppliedCommitIndex;
}
-
- public RaftLogManager(LogApplier logApplier) {
- this.logApplier = logApplier;
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 173b7bcc9c..7c0647b24d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -84,6 +84,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -171,7 +172,6 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
private LocalQueryExecutor localQueryExecutor;
- LogApplier dataLogApplier;
/**
* When a new partition table is installed, all data members will be checked
if unchanged. If not,
* such members will be removed.
@@ -181,7 +181,8 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
private LastAppliedPatitionTableVersion lastAppliedPartitionTableVersion;
@TestOnly
- public DataGroupMember(Node thisNode, PartitionGroup nodes) {
+ public DataGroupMember(Node thisNode, PartitionGroup nodes, IStateMachine
stateMachine) {
+ super(stateMachine);
// constructor for test
this.name =
"Data-"
@@ -211,7 +212,8 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
logSequencer = SEQUENCER_FACTORY.create(this, logManager);
}
- DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember
metaGroupMember) {
+ DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember
metaGroupMember,
+ IStateMachine stateMachine) {
// The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
super(
"Data-"
@@ -223,7 +225,8 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
+ "",
new ClientManager(
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
- ClientManager.Type.DataGroupClient));
+ ClientManager.Type.DataGroupClient),
+ stateMachine);
groupId = new DataRegionId(nodes.getHeader().node.nodeIdentifier +
nodes.getHeader().raftId);
this.metaGroupMember = metaGroupMember;
setThisNode(thisNode);
@@ -237,14 +240,10 @@ public class DataGroupMember extends RaftMember
implements DataGroupMemberMBean
getRaftGroupId());
setQueryManager(new ClusterQueryManager());
slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(),
getName());
- dataLogApplier = new DataLogApplier(metaGroupMember, this);
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
- && ClusterDescriptor.getInstance().getConfig().getReplicationNum() !=
1) {
- dataLogApplier = new AsyncDataLogApplier(dataLogApplier, name);
- }
+
logManager =
new FilePartitionedSnapshotLogManager(
- dataLogApplier, metaGroupMember.getPartitionTable(),
allNodes.get(0), thisNode, this);
+ stateMachine, metaGroupMember.getPartitionTable(),
allNodes.get(0), thisNode, this);
logSequencer = SEQUENCER_FACTORY.create(this, logManager);
initPeerMap();
term.set(logManager.getHardState().getCurrentTerm());
@@ -343,16 +342,14 @@ public class DataGroupMember extends RaftMember
implements DataGroupMemberMBean
public static class Factory {
- private TProtocolFactory protocolFactory;
private MetaGroupMember metaGroupMember;
- public Factory(TProtocolFactory protocolFactory, MetaGroupMember
metaGroupMember) {
- this.protocolFactory = protocolFactory;
+ public Factory(MetaGroupMember metaGroupMember) {
this.metaGroupMember = metaGroupMember;
}
- public DataGroupMember create(Node thisNode, PartitionGroup
partitionGroup) {
- return new DataGroupMember(thisNode, partitionGroup, metaGroupMember);
+ public DataGroupMember create(Node thisNode, PartitionGroup
partitionGroup, IStateMachine stateMachine) {
+ return new DataGroupMember(thisNode, partitionGroup, metaGroupMember,
stateMachine);
}
}
@@ -717,7 +714,7 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
}
handleChangeMembershipLogWithoutRaft(log);
} else {
- ((DataLogApplier) dataLogApplier).applyRequest(request);
+ return new ConsensusWriteResponse(null, stateMachine.write(request));
}
return new ConsensusWriteResponse(null, StatusUtils.OK);
} catch (Exception e) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index d2931ed34d..a786a86d62 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -82,6 +82,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -209,24 +210,25 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
boolean ready = false;
@TestOnly
- public MetaGroupMember() {
+ public MetaGroupMember(IStateMachine stateMachine) {
+ super(stateMachine);
groupId = new PartitionRegionId(0);
}
- public MetaGroupMember(Node thisNode, Coordinator coordinator) {
+ public MetaGroupMember(Node thisNode, Coordinator coordinator, IStateMachine
stateMachine) {
super(
"Meta",
new ClientManager(
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
- ClientManager.Type.MetaGroupClient));
+ ClientManager.Type.MetaGroupClient),
+ stateMachine);
groupId = new PartitionRegionId(0);
setThisNode(thisNode);
setAllNodes(new PartitionGroup());
initPeerMap();
// committed logs are applied to the state machine (the IoTDB instance)
through the applier
- LogApplier metaLogApplier = new MetaLogApplier(this);
- logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
+ logManager = new MetaSingleSnapshotLogManager(stateMachine, this);
logSequencer = SEQUENCER_FACTORY.create(this, logManager);
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0b06c31af5..51f4bb78c1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -89,6 +89,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -278,10 +279,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
*/
private volatile boolean skipElection = true;
- /**
- * localExecutor is used to directly execute plans like load configuration
in the underlying IoTDB
- */
- protected PlanExecutor localExecutor;
+ protected IStateMachine stateMachine;
/** (logIndex, logTerm) -> append handler */
protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers =
@@ -299,11 +297,14 @@ public abstract class RaftMember implements
RaftMemberMBean {
private ThreadLocal<String> threadBaseName = new ThreadLocal<>();
- protected RaftMember() {}
+ protected RaftMember(IStateMachine stateMachine) {
+ this.stateMachine = stateMachine;
+ }
- protected RaftMember(String name, ClientManager clientManager) {
+ protected RaftMember(String name, ClientManager clientManager, IStateMachine
stateMachine) {
this.name = name;
this.clientManager = clientManager;
+ this.stateMachine = stateMachine;
}
/**
@@ -705,11 +706,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
return response;
}
- public PlanExecutor getLocalExecutor() throws QueryProcessException {
- if (localExecutor == null) {
- localExecutor = new PlanExecutor();
- }
- return localExecutor;
+ public IStateMachine getStateMachine() {
+ return stateMachine;
}
public void sendLogAsync(
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
index 52681d28c2..2b919a32f9 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
@@ -35,7 +35,7 @@ public class TestPartitionedLogManager extends
PartitionedSnapshotLogManager {
new Node("localhost", 30001, 1, Constants.RPC_PORT, 6667, "localhost"),
null,
null,
- null);
+ null, stateMachine);
}
public TestPartitionedLogManager(
@@ -46,7 +46,7 @@ public class TestPartitionedLogManager extends
PartitionedSnapshotLogManager {
header,
new Node("localhost", 30001, 1, 40001, Constants.RPC_PORT,
"localhost"),
factory,
- null);
+ null, stateMachine);
}
@Override
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 728ce7dc20..16a7f23037 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -463,7 +463,7 @@ public class DataLogApplierTest extends IoTDBTest {
@Test
public void testApplyDeletePartitionFilter() throws QueryProcessException {
- applier.setQueryExecutor(
+ applier.setS(
new PlanExecutor() {
@Override
public boolean processNonQuery(PhysicalPlan plan) {