This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_catch_up by this push:
     new 34879e6bef add state machine integration
34879e6bef is described below

commit 34879e6bef9b986e7fe3f7e24417a14bf43c209d
Author: jt <[email protected]>
AuthorDate: Wed Jun 15 11:26:31 2022 +0800

    add state machine integration
---
 .../iotdb/cluster/coordinator/Coordinator.java     |  19 +-
 .../iotdb/cluster/log/applier/BaseApplier.java     | 210 +--------------------
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  99 ++--------
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |  22 ++-
 .../manage/FilePartitionedSnapshotLogManager.java  |  20 +-
 .../log/manage/MetaSingleSnapshotLogManager.java   |  14 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |   6 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  13 +-
 .../cluster/server/member/DataGroupMember.java     |  29 ++-
 .../cluster/server/member/MetaGroupMember.java     |  12 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  20 +-
 .../cluster/common/TestPartitionedLogManager.java  |   4 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   2 +-
 13 files changed, 101 insertions(+), 369 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index c03b1c0a80..83824e9561 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -148,29 +148,12 @@ public class Coordinator {
 
   /** execute a non-query plan that is not necessary to be executed on other 
nodes. */
   private TSStatus executeNonQueryLocally(IConsensusRequest request) {
-    boolean execRet;
     try {
-      if (request instanceof PhysicalPlan) {
-        execRet = 
metaGroupMember.getLocalExecutor().processNonQuery(((PhysicalPlan) request));
-      } else {
-        return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-            "Unsupported request: " + request);
-      }
-    } catch (QueryProcessException e) {
-      if (e.getErrorCode() != 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
-        logger.debug("meet error while processing non-query. ", e);
-      } else {
-        logger.warn("meet error while processing non-query. ", e);
-      }
-      return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+      return metaGroupMember.getStateMachine().write(request);
     } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
     }
-
-    return execRet
-        ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully")
-        : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
   /**
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 34767fa6e9..876be2bfde 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.BatchProcessException;
@@ -38,7 +39,6 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import 
org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -54,212 +54,14 @@ import java.util.Collections;
 /** BaseApplier use PlanExecutor to execute PhysicalPlans. */
 abstract class BaseApplier implements LogApplier {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(BaseApplier.class);
+  IStateMachine stateMachine;
 
-  MetaGroupMember metaGroupMember;
-  private PlanExecutor queryExecutor;
-
-  BaseApplier(MetaGroupMember metaGroupMember) {
-    this.metaGroupMember = metaGroupMember;
-  }
-
-  /**
-   * @param request
-   * @param dataGroupMember the data group member that is applying the log, 
null if the log is
-   *     applied by a meta group member
-   * @throws QueryProcessException
-   * @throws StorageGroupNotSetException
-   * @throws StorageEngineException
-   */
-  void applyRequest(IConsensusRequest request, DataGroupMember dataGroupMember)
-      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
-    if (request instanceof InsertPlan) {
-      processPlanWithTolerance((InsertPlan) request, dataGroupMember);
-    } else if (request instanceof PhysicalPlan && !((PhysicalPlan) 
request).isQuery()) {
-      PhysicalPlan plan = ((PhysicalPlan) request);
-      try {
-        getQueryExecutor().processNonQuery(((PhysicalPlan) request));
-      } catch (BatchProcessException e) {
-        handleBatchProcessException(e, plan);
-      } catch (QueryProcessException e) {
-        if (e.getCause() instanceof StorageGroupNotSetException
-            || e.getCause() instanceof UndefinedTemplateException) {
-          executeAfterSync(plan);
-        } else {
-          throw e;
-        }
-      } catch (StorageGroupNotSetException e) {
-        executeAfterSync(plan);
-      }
-    } else if (request != null) {
-      throw new QueryProcessException("Unsupported request: " + request);
-    }
-  }
-
-  private void handleBatchProcessException(
-      BatchProcessException e, InsertPlan plan, DataGroupMember 
dataGroupMember)
-      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
-    if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
-      TSStatus[] failingStatus = e.getFailingStatus();
-      for (int i = 0; i < failingStatus.length; i++) {
-        TSStatus status = failingStatus[i];
-        // skip succeeded plans in later execution
-        if (status != null
-            && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && plan instanceof BatchPlan) {
-          ((BatchPlan) plan).setIsExecuted(i);
-        }
-      }
-
-      boolean needRetry = false, hasError = false;
-      for (int i = 0, failingStatusLength = failingStatus.length; i < 
failingStatusLength; i++) {
-        TSStatus status = failingStatus[i];
-        if (status != null) {
-          if (status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
-              && plan instanceof BatchPlan) {
-            ((BatchPlan) plan).unsetIsExecuted(i);
-            needRetry = true;
-          } else if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            hasError = true;
-          }
-        }
-      }
-      if (hasError) {
-        throw e;
-      }
-      if (needRetry) {
-        pullTimeseriesSchema(plan, dataGroupMember.getHeader());
-        plan.recoverFromFailure();
-        getQueryExecutor().processNonQuery(plan);
-      }
-    } else {
-      throw e;
-    }
-  }
-
-  private void handleBatchProcessException(BatchProcessException e, 
PhysicalPlan plan)
-      throws QueryProcessException, StorageEngineException, 
StorageGroupNotSetException {
-    TSStatus[] failingStatus = e.getFailingStatus();
-    boolean needThrow = false;
-    for (int i = 0; i < failingStatus.length; i++) {
-      TSStatus status = failingStatus[i];
-      // skip succeeded plans in later execution
-      if (status != null
-          && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && plan instanceof BatchPlan) {
-        ((BatchPlan) plan).setIsExecuted(i);
-      }
-
-      if (plan instanceof DeleteTimeSeriesPlan) {
-        if (status != null && status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          if (status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
-            logger.info("{} doesn't exist, it may has been deleted.", 
plan.getPaths().get(i));
-          } else {
-            needThrow = true;
-          }
-        }
-      }
-    }
-    boolean needRetry = false;
-    for (int i = 0, failingStatusLength = failingStatus.length; i < 
failingStatusLength; i++) {
-      TSStatus status = failingStatus[i];
-      if (status != null
-          && (status.getCode() == 
TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode()
-              || status.getCode() == 
TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode())
-          && plan instanceof BatchPlan) {
-        ((BatchPlan) plan).unsetIsExecuted(i);
-        needRetry = true;
-      }
-    }
-    if (needRetry) {
-      executeAfterSync(plan);
-      return;
-    }
-
-    if (!(plan instanceof DeleteTimeSeriesPlan) || needThrow) {
-      throw e;
-    }
-  }
-
-  private void executeAfterSync(PhysicalPlan plan)
-      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
-    try {
-      metaGroupMember.syncLeaderWithConsistencyCheck(true);
-    } catch (CheckConsistencyException ce) {
-      throw new QueryProcessException(ce.getMessage());
-    }
-    getQueryExecutor().processNonQuery(plan);
-  }
-
-  /**
-   * @param plan
-   * @param dataGroupMember the data group member that is applying the log, 
null if the log is
-   *     applied by a meta group member
-   * @throws QueryProcessException
-   * @throws StorageGroupNotSetException
-   * @throws StorageEngineException
-   */
-  private void processPlanWithTolerance(InsertPlan plan, DataGroupMember 
dataGroupMember)
-      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
-    try {
-      getQueryExecutor().processNonQuery(plan);
-    } catch (BatchProcessException e) {
-      handleBatchProcessException(e, plan, dataGroupMember);
-    } catch (QueryProcessException | StorageGroupNotSetException | 
StorageEngineException e) {
-      if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
-        // check if this is caused by metadata missing, if so, pull metadata 
and retry
-        Throwable metaMissingException = 
SchemaUtils.findMetaMissingException(e);
-        boolean causedByPathNotExist = metaMissingException instanceof 
PathNotExistException;
-
-        if (causedByPathNotExist) {
-          if (logger.isDebugEnabled()) {
-            logger.debug(
-                "Timeseries is not found locally[{}], try pulling it from 
another group: {}",
-                metaGroupMember.getName(),
-                e.getCause().getMessage());
-          }
-          pullTimeseriesSchema(plan, dataGroupMember.getHeader());
-          plan.recoverFromFailure();
-          getQueryExecutor().processNonQuery(plan);
-        } else {
-          throw e;
-        }
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  /**
-   * @param plan
-   * @param ignoredGroup do not pull schema from the group to avoid backward 
dependency
-   * @throws QueryProcessException
-   */
-  private void pullTimeseriesSchema(InsertPlan plan, RaftNode ignoredGroup)
-      throws QueryProcessException {
-    try {
-      if (plan instanceof BatchPlan) {
-        MetaPuller.getInstance()
-            .pullTimeSeriesSchemas(((BatchPlan) plan).getPrefixPaths(), 
ignoredGroup);
-      } else {
-        PartialPath path = plan.getDevicePath();
-        MetaPuller.getInstance()
-            .pullTimeSeriesSchemas(Collections.singletonList(path), 
ignoredGroup);
-      }
-    } catch (MetadataException e1) {
-      throw new QueryProcessException(e1);
-    }
-  }
-
-  private PlanExecutor getQueryExecutor() throws QueryProcessException {
-    if (queryExecutor == null) {
-      queryExecutor = new ClusterPlanExecutor(metaGroupMember);
-    }
-    return queryExecutor;
+  BaseApplier(IStateMachine stateMachine) {
+    this.stateMachine = stateMachine;
   }
 
   @TestOnly
-  public void setQueryExecutor(PlanExecutor queryExecutor) {
-    this.queryExecutor = queryExecutor;
+  public void setStateMachine(IStateMachine stateMachine) {
+    this.stateMachine = stateMachine;
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 95d1f126c1..266e61c332 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -20,32 +20,23 @@
 package org.apache.iotdb.cluster.log.applier;
 
 import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.IOUtils;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.service.IoTDB;
-
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,8 +49,9 @@ public class DataLogApplier extends BaseApplier {
 
   protected DataGroupMember dataGroupMember;
 
-  public DataLogApplier(MetaGroupMember metaGroupMember, DataGroupMember 
dataGroupMember) {
-    super(metaGroupMember);
+  public DataLogApplier(DataGroupMember dataGroupMember,
+      IStateMachine stateMachine) {
+    super(stateMachine);
     this.dataGroupMember = dataGroupMember;
   }
 
@@ -83,7 +75,10 @@ public class DataLogApplier extends BaseApplier {
       } else if (log instanceof RequestLog) {
         RequestLog requestLog = (RequestLog) log;
         IConsensusRequest request = requestLog.getRequest();
-        applyRequest(request);
+        TSStatus status = applyRequest(request);
+        if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          log.setException(new QueryProcessException(status.message, 
status.code));
+        }
       } else if (log instanceof CloseFileLog) {
         CloseFileLog closeFileLog = ((CloseFileLog) log);
         StorageEngine.getInstance()
@@ -106,81 +101,13 @@ public class DataLogApplier extends BaseApplier {
     }
   }
 
-  public void applyRequest(IConsensusRequest request)
-      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
+  public TSStatus applyRequest(IConsensusRequest request) {
     if (request instanceof DeletePlan) {
       ((DeletePlan) 
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
     } else if (request instanceof DeleteTimeSeriesPlan) {
       ((DeleteTimeSeriesPlan) 
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
     }
-    if (request instanceof InsertMultiTabletsPlan) {
-      applyInsert((InsertMultiTabletsPlan) request);
-    } else if (request instanceof InsertRowsPlan) {
-      applyInsert((InsertRowsPlan) request);
-    } else if (request instanceof InsertPlan) {
-      applyInsert((InsertPlan) request);
-    } else {
-      applyRequest(request, dataGroupMember);
-    }
-  }
 
-  private void applyInsert(InsertMultiTabletsPlan plan)
-      throws StorageGroupNotSetException, QueryProcessException, 
StorageEngineException {
-    boolean hasSync = false;
-    for (InsertTabletPlan insertTabletPlan : plan.getInsertTabletPlanList()) {
-      try {
-        
IoTDB.schemaProcessor.getBelongedStorageGroup(insertTabletPlan.getDevicePath());
-      } catch (StorageGroupNotSetException e) {
-        try {
-          if (!hasSync) {
-            metaGroupMember.syncLeaderWithConsistencyCheck(true);
-            hasSync = true;
-          } else {
-            throw new StorageEngineException(e.getMessage());
-          }
-        } catch (CheckConsistencyException ce) {
-          throw new QueryProcessException(ce.getMessage());
-        }
-      }
-    }
-    applyRequest(plan, dataGroupMember);
-  }
-
-  private void applyInsert(InsertRowsPlan plan)
-      throws StorageGroupNotSetException, QueryProcessException, 
StorageEngineException {
-    boolean hasSync = false;
-    for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) {
-      try {
-        
IoTDB.schemaProcessor.getBelongedStorageGroup(insertRowPlan.getDevicePath());
-      } catch (StorageGroupNotSetException e) {
-        try {
-          if (!hasSync) {
-            metaGroupMember.syncLeaderWithConsistencyCheck(true);
-            hasSync = true;
-          } else {
-            throw new StorageEngineException(e.getMessage());
-          }
-        } catch (CheckConsistencyException ce) {
-          throw new QueryProcessException(ce.getMessage());
-        }
-      }
-    }
-    applyRequest(plan, dataGroupMember);
-  }
-
-  private void applyInsert(InsertPlan plan)
-      throws StorageGroupNotSetException, QueryProcessException, 
StorageEngineException {
-    try {
-      IoTDB.schemaProcessor.getBelongedStorageGroup(plan.getDevicePath());
-    } catch (StorageGroupNotSetException e) {
-      // the sg may not exist because the node does not catch up with the 
leader, retry after
-      // synchronization
-      try {
-        metaGroupMember.syncLeaderWithConsistencyCheck(true);
-      } catch (CheckConsistencyException ce) {
-        throw new QueryProcessException(ce.getMessage());
-      }
-    }
-    applyRequest(plan, dataGroupMember);
+    return stateMachine.write(request);
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index c75c15a914..1259c0df84 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.db.mpp.execution.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,19 +40,19 @@ public class MetaLogApplier extends BaseApplier {
   private static final Logger logger = 
LoggerFactory.getLogger(MetaLogApplier.class);
   private MetaGroupMember member;
 
-  public MetaLogApplier(MetaGroupMember member) {
-    super(member);
+  public MetaLogApplier(MetaGroupMember member, IStateMachine stateMachine) {
+    super(stateMachine);
     this.member = member;
   }
 
   @Override
   public void apply(Log log) {
     try {
-      logger.debug("MetaMember [{}] starts applying Log {}", 
metaGroupMember.getName(), log);
+      logger.debug("MetaMember [{}] starts applying Log {}", member.getName(), 
log);
       if (log instanceof AddNodeLog) {
         applyAddNodeLog((AddNodeLog) log);
       } else if (log instanceof RequestLog) {
-        applyRequest(((RequestLog) log).getRequest(), null);
+        stateMachine.write(((RequestLog) log).getRequest());
       } else if (log instanceof RemoveNodeLog) {
         applyRemoveNodeLog((RemoveNodeLog) log);
       } else if (log instanceof EmptyContentLog || log instanceof 
FragmentedLog) {
@@ -67,24 +69,24 @@ public class MetaLogApplier extends BaseApplier {
   }
 
   private void applyAddNodeLog(AddNodeLog log) throws 
ChangeMembershipException {
-    if 
(!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+    if (!member.getPartitionTable().deserialize(log.getPartitionTable())) {
       logger.info("Ignore previous change membership log");
       // ignore previous change membership log
       return;
     }
-    if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
-      metaGroupMember.getCoordinator().sendLogToAllDataGroups(log);
+    if (member.getCharacter() == NodeCharacter.LEADER) {
+      member.getCoordinator().sendLogToAllDataGroups(log);
     }
     member.applyAddNode(log);
   }
 
   private void applyRemoveNodeLog(RemoveNodeLog log) throws 
ChangeMembershipException {
-    if 
(!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+    if (!member.getPartitionTable().deserialize(log.getPartitionTable())) {
       // ignore previous change membership log
       return;
     }
-    if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
-      metaGroupMember.getCoordinator().sendLogToAllDataGroups(log);
+    if (member.getCharacter() == NodeCharacter.LEADER) {
+      member.getCoordinator().sendLogToAllDataGroups(log);
     }
     member.applyRemoveNode(log);
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 3f25525ad6..656def1b65 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -20,9 +20,12 @@
 package org.apache.iotdb.cluster.log.manage;
 
 import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.EntryCompactedException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.applier.AsyncDataLogApplier;
+import org.apache.iotdb.cluster.log.applier.DataLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
@@ -32,8 +35,10 @@ import 
org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.execution.StateMachine;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 
@@ -62,12 +67,23 @@ public class FilePartitionedSnapshotLogManager extends 
PartitionedSnapshotLogMan
       LoggerFactory.getLogger(FilePartitionedSnapshotLogManager.class);
 
   public FilePartitionedSnapshotLogManager(
-      LogApplier logApplier,
+      IStateMachine stateMachine,
       PartitionTable partitionTable,
       Node header,
       Node thisNode,
       DataGroupMember dataGroupMember) {
-    super(logApplier, partitionTable, header, thisNode, Factory.INSTANCE, 
dataGroupMember);
+    super(createLogApplier(dataGroupMember, stateMachine), partitionTable, 
header, thisNode, Factory.INSTANCE,
+        dataGroupMember, stateMachine);
+  }
+
+  private static LogApplier createLogApplier(
+      DataGroupMember dataGroupMember, IStateMachine stateMachine) {
+    LogApplier applier = new DataLogApplier(dataGroupMember, stateMachine);
+    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
+        && ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 
1) {
+      applier = new AsyncDataLogApplier(applier, dataGroupMember.getName());
+    }
+    return applier;
   }
 
   /** send FlushPlan to all nodes in one dataGroup */
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index d9bc9e8f9f..32c7d19ce5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.cluster.log.manage;
 
-import org.apache.iotdb.cluster.log.LogApplier;
+import java.io.IOException;
+import java.util.Map;
 import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
 import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -30,16 +32,13 @@ import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
 import org.apache.iotdb.commons.auth.entity.Role;
 import org.apache.iotdb.commons.auth.entity.User;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
 import org.apache.iotdb.db.service.IoTDB;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-
 /** MetaSingleSnapshotLogManager provides a MetaSimpleSnapshot as snapshot. */
 public class MetaSingleSnapshotLogManager extends RaftLogManager {
 
@@ -52,8 +51,9 @@ public class MetaSingleSnapshotLogManager extends 
RaftLogManager {
   private long commitIndex;
   private long term;
 
-  public MetaSingleSnapshotLogManager(LogApplier logApplier, MetaGroupMember 
metaGroupMember) {
-    super(new SyncLogDequeSerializer(0), logApplier, 
metaGroupMember.getName());
+  public MetaSingleSnapshotLogManager(IStateMachine stateMachine, 
MetaGroupMember metaGroupMember) {
+    super(new SyncLogDequeSerializer(0), new MetaLogApplier(metaGroupMember, 
stateMachine),
+        metaGroupMember.getName(), stateMachine);
     this.metaGroupMember = metaGroupMember;
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 88713aacc5..8e2c7f46c0 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
@@ -73,11 +74,12 @@ public abstract class PartitionedSnapshotLogManager<T 
extends Snapshot> extends
       Node header,
       Node thisNode,
       SnapshotFactory<T> factory,
-      DataGroupMember dataGroupMember) {
+      DataGroupMember dataGroupMember, IStateMachine stateMachine) {
     super(
         new SyncLogDequeSerializer(header.nodeIdentifier),
         logApplier,
-        Integer.toString(header.getNodeIdentifier()));
+        Integer.toString(header.getNodeIdentifier()),
+        stateMachine);
     this.partitionTable = partitionTable;
     this.factory = factory;
     this.thisNode = thisNode;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index d65e060da9..ec2dbcbd6e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -31,11 +31,13 @@ import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.StableEntryManager;
 import org.apache.iotdb.cluster.log.manage.serializable.LogManagerMeta;
+import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.utils.TestOnly;
 
+import org.apache.iotdb.consensus.IStateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,9 +113,14 @@ public abstract class RaftLogManager {
 
   protected List<Log> blockedUnappliedLogList;
 
-  protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier 
applier, String name) {
+  protected IStateMachine stateMachine;
+
+  protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier 
applier, String name
+      , IStateMachine stateMachine) {
     this.logApplier = applier;
     this.name = name;
+    this.stateMachine = stateMachine;
+
     LogManagerMeta meta = stableEntryManager.getMeta();
     this.setCommittedEntryManager(new CommittedEntryManager(maxNumOfLogsInMem, 
meta));
     this.setStableEntryManager(stableEntryManager);
@@ -1071,8 +1078,4 @@ public abstract class RaftLogManager {
   public long getBlockAppliedCommitIndex() {
     return blockAppliedCommitIndex;
   }
-
-  public RaftLogManager(LogApplier logApplier) {
-    this.logApplier = logApplier;
-  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 173b7bcc9c..7c0647b24d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -84,6 +84,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -171,7 +172,6 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
 
   private LocalQueryExecutor localQueryExecutor;
 
-  LogApplier dataLogApplier;
   /**
    * When a new partition table is installed, all data members will be checked 
if unchanged. If not,
    * such members will be removed.
@@ -181,7 +181,8 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
   private LastAppliedPatitionTableVersion lastAppliedPartitionTableVersion;
 
   @TestOnly
-  public DataGroupMember(Node thisNode, PartitionGroup nodes) {
+  public DataGroupMember(Node thisNode, PartitionGroup nodes, IStateMachine 
stateMachine) {
+    super(stateMachine);
     // constructor for test
     this.name =
         "Data-"
@@ -211,7 +212,8 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
     logSequencer = SEQUENCER_FACTORY.create(this, logManager);
   }
 
-  DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember 
metaGroupMember) {
+  DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember 
metaGroupMember,
+      IStateMachine stateMachine) {
     // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
     super(
         "Data-"
@@ -223,7 +225,8 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
             + "",
         new ClientManager(
             ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
-            ClientManager.Type.DataGroupClient));
+            ClientManager.Type.DataGroupClient),
+        stateMachine);
     groupId = new DataRegionId(nodes.getHeader().node.nodeIdentifier + 
nodes.getHeader().raftId);
     this.metaGroupMember = metaGroupMember;
     setThisNode(thisNode);
@@ -237,14 +240,10 @@ public class DataGroupMember extends RaftMember 
implements DataGroupMemberMBean
             getRaftGroupId());
     setQueryManager(new ClusterQueryManager());
     slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), 
getName());
-    dataLogApplier = new DataLogApplier(metaGroupMember, this);
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
-        && ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 
1) {
-      dataLogApplier = new AsyncDataLogApplier(dataLogApplier, name);
-    }
+
     logManager =
         new FilePartitionedSnapshotLogManager(
-            dataLogApplier, metaGroupMember.getPartitionTable(), 
allNodes.get(0), thisNode, this);
+            stateMachine, metaGroupMember.getPartitionTable(), 
allNodes.get(0), thisNode, this);
     logSequencer = SEQUENCER_FACTORY.create(this, logManager);
     initPeerMap();
     term.set(logManager.getHardState().getCurrentTerm());
@@ -343,16 +342,14 @@ public class DataGroupMember extends RaftMember 
implements DataGroupMemberMBean
 
   public static class Factory {
 
-    private TProtocolFactory protocolFactory;
     private MetaGroupMember metaGroupMember;
 
-    public Factory(TProtocolFactory protocolFactory, MetaGroupMember 
metaGroupMember) {
-      this.protocolFactory = protocolFactory;
+    public Factory(MetaGroupMember metaGroupMember) {
       this.metaGroupMember = metaGroupMember;
     }
 
-    public DataGroupMember create(Node thisNode, PartitionGroup 
partitionGroup) {
-      return new DataGroupMember(thisNode, partitionGroup, metaGroupMember);
+    public DataGroupMember create(Node thisNode, PartitionGroup 
partitionGroup, IStateMachine stateMachine) {
+      return new DataGroupMember(thisNode, partitionGroup, metaGroupMember, 
stateMachine);
     }
   }
 
@@ -717,7 +714,7 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
           }
           handleChangeMembershipLogWithoutRaft(log);
         } else {
-          ((DataLogApplier) dataLogApplier).applyRequest(request);
+          return new ConsensusWriteResponse(null, stateMachine.write(request));
         }
         return new ConsensusWriteResponse(null, StatusUtils.OK);
       } catch (Exception e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index d2931ed34d..a786a86d62 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -82,6 +82,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -209,24 +210,25 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
   boolean ready = false;
 
   @TestOnly
-  public MetaGroupMember() {
+  public MetaGroupMember(IStateMachine stateMachine) {
+    super(stateMachine);
     groupId = new PartitionRegionId(0);
   }
 
-  public MetaGroupMember(Node thisNode, Coordinator coordinator) {
+  public MetaGroupMember(Node thisNode, Coordinator coordinator, IStateMachine 
stateMachine) {
     super(
         "Meta",
         new ClientManager(
             ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
-            ClientManager.Type.MetaGroupClient));
+            ClientManager.Type.MetaGroupClient),
+        stateMachine);
     groupId = new PartitionRegionId(0);
     setThisNode(thisNode);
     setAllNodes(new PartitionGroup());
     initPeerMap();
 
     // committed logs are applied to the state machine (the IoTDB instance) 
through the applier
-    LogApplier metaLogApplier = new MetaLogApplier(this);
-    logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
+    logManager = new MetaSingleSnapshotLogManager(stateMachine, this);
     logSequencer = SEQUENCER_FACTORY.create(this, logManager);
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0b06c31af5..51f4bb78c1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -89,6 +89,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -278,10 +279,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
    */
   private volatile boolean skipElection = true;
 
-  /**
-   * localExecutor is used to directly execute plans like load configuration 
in the underlying IoTDB
-   */
-  protected PlanExecutor localExecutor;
+  protected IStateMachine stateMachine;
 
   /** (logIndex, logTerm) -> append handler */
   protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers =
@@ -299,11 +297,14 @@ public abstract class RaftMember implements 
RaftMemberMBean {
 
   private ThreadLocal<String> threadBaseName = new ThreadLocal<>();
 
-  protected RaftMember() {}
+  protected RaftMember(IStateMachine stateMachine) {
+    this.stateMachine = stateMachine;
+  }
 
-  protected RaftMember(String name, ClientManager clientManager) {
+  protected RaftMember(String name, ClientManager clientManager, IStateMachine 
stateMachine) {
     this.name = name;
     this.clientManager = clientManager;
+    this.stateMachine = stateMachine;
   }
 
   /**
@@ -705,11 +706,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     return response;
   }
 
-  public PlanExecutor getLocalExecutor() throws QueryProcessException {
-    if (localExecutor == null) {
-      localExecutor = new PlanExecutor();
-    }
-    return localExecutor;
+  public IStateMachine getStateMachine() {
+    return stateMachine;
   }
 
   public void sendLogAsync(
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
index 52681d28c2..2b919a32f9 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
@@ -35,7 +35,7 @@ public class TestPartitionedLogManager extends 
PartitionedSnapshotLogManager {
         new Node("localhost", 30001, 1, Constants.RPC_PORT, 6667, "localhost"),
         null,
         null,
-        null);
+        null, stateMachine);
   }
 
   public TestPartitionedLogManager(
@@ -46,7 +46,7 @@ public class TestPartitionedLogManager extends 
PartitionedSnapshotLogManager {
         header,
         new Node("localhost", 30001, 1, 40001, Constants.RPC_PORT, 
"localhost"),
         factory,
-        null);
+        null, stateMachine);
   }
 
   @Override
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 728ce7dc20..16a7f23037 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -463,7 +463,7 @@ public class DataLogApplierTest extends IoTDBTest {
 
   @Test
   public void testApplyDeletePartitionFilter() throws QueryProcessException {
-    applier.setQueryExecutor(
+    applier.setS(
         new PlanExecutor() {
           @Override
           public boolean processNonQuery(PhysicalPlan plan) {

Reply via email to