This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_catch_up by this push:
     new e2e2959a7f replace physical plan with IConsensusRequest
e2e2959a7f is described below

commit e2e2959a7f1888010af4ca14c8234a4d52d7358c
Author: jt <[email protected]>
AuthorDate: Wed Jun 15 10:17:20 2022 +0800

    replace physical plan with IConsensusRequest
---
 .../cluster/client/sync/SyncClientAdaptor.java     |   5 +-
 .../iotdb/cluster/coordinator/Coordinator.java     |  37 ++++---
 .../cluster/impl/NativeSingleRaftConsensus.java    |   6 +-
 .../org/apache/iotdb/cluster/log/LogParser.java    |   8 +-
 .../cluster/log/applier/AsyncDataLogApplier.java   |   8 +-
 .../iotdb/cluster/log/applier/BaseApplier.java     |  18 ++--
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  41 ++++----
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   6 +-
 .../iotdb/cluster/log/logtypes/RequestLog.java     |  43 ++++----
 .../log/sequencing/AsynchronousSequencer.java      |   6 +-
 .../log/sequencing/SynchronousSequencer.java       |   8 +-
 .../handlers/forwarder/ForwardPlanHandler.java     |  12 +--
 .../cluster/server/member/DataGroupMember.java     |  77 ++++++++------
 .../cluster/server/member/MetaGroupMember.java     |  19 +++-
 .../iotdb/cluster/server/member/RaftMember.java    | 116 +++++++++++----------
 .../cluster/server/service/BaseAsyncService.java   |  11 +-
 .../cluster/server/service/BaseSyncService.java    |   8 +-
 .../apache/iotdb/cluster/utils/PartitionUtils.java |   7 +-
 .../iotdb/cluster/common/TestLogApplier.java       |   8 +-
 .../apache/iotdb/cluster/log/LogParserTest.java    |   6 +-
 .../log/applier/AsyncDataLogApplierTest.java       |  18 ++--
 .../cluster/log/applier/DataLogApplierTest.java    |  32 +++---
 .../cluster/log/applier/MetaLogApplierTest.java    |  12 +--
 .../cluster/log/logtypes/SerializeLogTest.java     |   8 +-
 .../cluster/server/member/DataGroupMemberTest.java |   4 +-
 .../iotdb/cluster/utils/SerializeUtilTest.java     |   6 +-
 26 files changed, 298 insertions(+), 232 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index b066bc86a0..29eee1ea21 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.SerializeUtils;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -423,11 +424,11 @@ public class SyncClientAdaptor {
   }
 
   public static TSStatus executeNonQuery(
-      AsyncClient client, PhysicalPlan plan, RaftNode header, Node receiver)
+      AsyncClient client, IConsensusRequest plan, RaftNode header, Node 
receiver)
       throws IOException, TException, InterruptedException {
     AtomicReference<TSStatus> status = new AtomicReference<>();
     ExecutNonQueryReq req = new ExecutNonQueryReq();
-    req.planBytes = 
ByteBuffer.wrap(PlanSerializer.getInstance().serialize(plan));
+    req.planBytes = plan.serializeToByteBuffer();
     if (header != null) {
       req.setHeader(header);
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index bfd48894b5..c03b1c0a80 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -116,37 +117,45 @@ public class Coordinator {
    * nodes (like timeseries deletion) or the nodes that belong to certain 
groups (like data
    * ingestion).
    *
-   * @param plan a non-query plan.
+   * @param request a non-query plan.
    */
-  public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+  public TSStatus executeNonQueryPlan(IConsensusRequest request) {
     TSStatus result;
     long startTime = 
Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.getOperationStartTime();
-    if (PartitionUtils.isLocalNonQueryPlan(plan)) {
+    if (PartitionUtils.isLocalNonQueryPlan(request)) {
       // run locally
-      result = executeNonQueryLocally(plan);
-    } else if (PartitionUtils.isGlobalMetaPlan(plan)) {
+      result = executeNonQueryLocally(request);
+    } else if (PartitionUtils.isGlobalMetaPlan(request)) {
       // forward the plan to all meta group nodes
-      result = metaGroupMember.processNonPartitionedMetaPlan(plan);
-    } else if (PartitionUtils.isGlobalDataPlan(plan)) {
+      result = metaGroupMember.processNonPartitionedMetaPlan(request);
+    } else if (PartitionUtils.isGlobalDataPlan(request)) {
       // forward the plan to all data group nodes
-      result = processNonPartitionedDataPlan(plan);
-    } else {
+      result = processNonPartitionedDataPlan(((PhysicalPlan) request));
+    } else if (request instanceof PhysicalPlan) {
       // split the plan and forward them to some PartitionGroups
       try {
-        result = processPartitionedPlan(plan);
+        result = processPartitionedPlan(((PhysicalPlan) request));
       } catch (UnsupportedPlanException e) {
         return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION, 
e.getMessage());
       }
+    } else {
+      result = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
+          "Unsupported request: " + request);
     }
     
Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
     return result;
   }
 
   /** execute a non-query plan that is not necessary to be executed on other 
nodes. */
-  private TSStatus executeNonQueryLocally(PhysicalPlan plan) {
+  private TSStatus executeNonQueryLocally(IConsensusRequest request) {
     boolean execRet;
     try {
-      execRet = metaGroupMember.getLocalExecutor().processNonQuery(plan);
+      if (request instanceof PhysicalPlan) {
+        execRet = 
metaGroupMember.getLocalExecutor().processNonQuery(((PhysicalPlan) request));
+      } else {
+        return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
+            "Unsupported request: " + request);
+      }
     } catch (QueryProcessException e) {
       if (e.getErrorCode() != 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
         logger.debug("meet error while processing non-query. ", e);
@@ -299,7 +308,7 @@ public class Coordinator {
         status =
             metaGroupMember
                 .getLocalDataMember(partitionGroup.getHeader())
-                .executeNonQueryPlan(plan);
+                .executeRequest(plan).getStatus();
         logger.debug(
             "Execute {} in a local group of {} with status {}",
             plan,
@@ -479,7 +488,7 @@ public class Coordinator {
       result =
           metaGroupMember
               .getLocalDataMember(entry.getValue().getHeader())
-              .executeNonQueryPlan(entry.getKey());
+              .executeRequest(entry.getKey()).getStatus();
       logger.debug(
           "Execute {} in a local group of {}, {}",
           entry.getKey(),
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
index eac51c293a..622287d7a9 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
@@ -46,7 +46,7 @@ public class NativeSingleRaftConsensus implements 
ISingleConsensus {
 
   @Override
   public ConsensusWriteResponse write(IConsensusRequest request) {
-    return null;
+    return raftMember.executeRequest(request);
   }
 
   @Override
@@ -81,11 +81,11 @@ public class NativeSingleRaftConsensus implements 
ISingleConsensus {
 
   @Override
   public boolean isLeader() {
-    return false;
+    return raftMember.isLeader();
   }
 
   @Override
   public Peer getLeader() {
-    return null;
+    return raftMember.getLeaderPeer();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 26f8a42192..b434873f1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 
 import org.slf4j.Logger;
@@ -72,9 +72,9 @@ public class LogParser {
         log = addNodeLog;
         break;
       case PHYSICAL_PLAN:
-        PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog();
-        physicalPlanLog.deserialize(buffer);
-        log = physicalPlanLog;
+        RequestLog requestLog = new RequestLog();
+        requestLog.deserialize(buffer);
+        log = requestLog;
         break;
       case CLOSE_FILE:
         CloseFileLog closeFileLog = new CloseFileLog();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index ebcc885564..127a590b68 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.log.applier;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -116,9 +116,9 @@ public class AsyncDataLogApplier implements LogApplier {
   private PartialPath getLogKey(Log log) throws StorageGroupNotSetException {
     // we can only apply some kinds of plans in parallel, for other logs, we 
must wait until all
     // previous logs are applied, or the order of deletions and insertions may 
get wrong
-    if (log instanceof PhysicalPlanLog) {
-      PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
-      PhysicalPlan plan = physicalPlanLog.getPlan();
+    if (log instanceof RequestLog) {
+      RequestLog requestLog = (RequestLog) log;
+      PhysicalPlan plan = requestLog.getRequest();
       // this plan only affects one sg, so we can run it with other plans in 
parallel
       return getPlanKey(plan);
     } else if (log instanceof CloseFileLog) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 996d083d77..34767fa6e9 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -63,20 +64,21 @@ abstract class BaseApplier implements LogApplier {
   }
 
   /**
-   * @param plan
+   * @param request
    * @param dataGroupMember the data group member that is applying the log, 
null if the log is
    *     applied by a meta group member
    * @throws QueryProcessException
    * @throws StorageGroupNotSetException
    * @throws StorageEngineException
    */
-  void applyPhysicalPlan(PhysicalPlan plan, DataGroupMember dataGroupMember)
+  void applyRequest(IConsensusRequest request, DataGroupMember dataGroupMember)
       throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
-    if (plan instanceof InsertPlan) {
-      processPlanWithTolerance((InsertPlan) plan, dataGroupMember);
-    } else if (plan != null && !plan.isQuery()) {
+    if (request instanceof InsertPlan) {
+      processPlanWithTolerance((InsertPlan) request, dataGroupMember);
+    } else if (request instanceof PhysicalPlan && !((PhysicalPlan) 
request).isQuery()) {
+      PhysicalPlan plan = ((PhysicalPlan) request);
       try {
-        getQueryExecutor().processNonQuery(plan);
+        getQueryExecutor().processNonQuery(((PhysicalPlan) request));
       } catch (BatchProcessException e) {
         handleBatchProcessException(e, plan);
       } catch (QueryProcessException e) {
@@ -89,8 +91,8 @@ abstract class BaseApplier implements LogApplier {
       } catch (StorageGroupNotSetException e) {
         executeAfterSync(plan);
       }
-    } else if (plan != null) {
-      logger.error("Unsupported physical plan: {}", plan);
+    } else if (request != null) {
+      throw new QueryProcessException("Unsupported request: " + request);
     }
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index b2f138f71e..95d1f126c1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -24,12 +24,13 @@ import 
org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -79,10 +80,10 @@ public class DataLogApplier extends BaseApplier {
             .preRemoveNodeForDataGroup((RemoveNodeLog) log, dataGroupMember);
         dataGroupMember.setAndSaveLastAppliedPartitionTableVersion(
             ((RemoveNodeLog) log).getMetaLogIndex());
-      } else if (log instanceof PhysicalPlanLog) {
-        PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
-        PhysicalPlan plan = physicalPlanLog.getPlan();
-        applyPhysicalPlan(plan);
+      } else if (log instanceof RequestLog) {
+        RequestLog requestLog = (RequestLog) log;
+        IConsensusRequest request = requestLog.getRequest();
+        applyRequest(request);
       } else if (log instanceof CloseFileLog) {
         CloseFileLog closeFileLog = ((CloseFileLog) log);
         StorageEngine.getInstance()
@@ -105,21 +106,21 @@ public class DataLogApplier extends BaseApplier {
     }
   }
 
-  public void applyPhysicalPlan(PhysicalPlan plan)
+  public void applyRequest(IConsensusRequest request)
       throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
-    if (plan instanceof DeletePlan) {
-      ((DeletePlan) 
plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
-    } else if (plan instanceof DeleteTimeSeriesPlan) {
-      ((DeleteTimeSeriesPlan) 
plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+    if (request instanceof DeletePlan) {
+      ((DeletePlan) 
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+    } else if (request instanceof DeleteTimeSeriesPlan) {
+      ((DeleteTimeSeriesPlan) 
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
     }
-    if (plan instanceof InsertMultiTabletsPlan) {
-      applyInsert((InsertMultiTabletsPlan) plan);
-    } else if (plan instanceof InsertRowsPlan) {
-      applyInsert((InsertRowsPlan) plan);
-    } else if (plan instanceof InsertPlan) {
-      applyInsert((InsertPlan) plan);
+    if (request instanceof InsertMultiTabletsPlan) {
+      applyInsert((InsertMultiTabletsPlan) request);
+    } else if (request instanceof InsertRowsPlan) {
+      applyInsert((InsertRowsPlan) request);
+    } else if (request instanceof InsertPlan) {
+      applyInsert((InsertPlan) request);
     } else {
-      applyPhysicalPlan(plan, dataGroupMember);
+      applyRequest(request, dataGroupMember);
     }
   }
 
@@ -142,7 +143,7 @@ public class DataLogApplier extends BaseApplier {
         }
       }
     }
-    applyPhysicalPlan(plan, dataGroupMember);
+    applyRequest(plan, dataGroupMember);
   }
 
   private void applyInsert(InsertRowsPlan plan)
@@ -164,7 +165,7 @@ public class DataLogApplier extends BaseApplier {
         }
       }
     }
-    applyPhysicalPlan(plan, dataGroupMember);
+    applyRequest(plan, dataGroupMember);
   }
 
   private void applyInsert(InsertPlan plan)
@@ -180,6 +181,6 @@ public class DataLogApplier extends BaseApplier {
         throw new QueryProcessException(ce.getMessage());
       }
     }
-    applyPhysicalPlan(plan, dataGroupMember);
+    applyRequest(plan, dataGroupMember);
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index af3163cdd4..c75c15a914 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -49,8 +49,8 @@ public class MetaLogApplier extends BaseApplier {
       logger.debug("MetaMember [{}] starts applying Log {}", 
metaGroupMember.getName(), log);
       if (log instanceof AddNodeLog) {
         applyAddNodeLog((AddNodeLog) log);
-      } else if (log instanceof PhysicalPlanLog) {
-        applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
+      } else if (log instanceof RequestLog) {
+        applyRequest(((RequestLog) log).getRequest(), null);
       } else if (log instanceof RemoveNodeLog) {
         applyRemoveNodeLog((RemoveNodeLog) log);
       } else if (log instanceof EmptyContentLog || log instanceof 
FragmentedLog) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
index 5815ecc8b8..4f797d459f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log.logtypes;
 
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -35,23 +36,23 @@ import java.util.Objects;
 
 import static org.apache.iotdb.cluster.log.Log.Types.PHYSICAL_PLAN;
 
-/** PhysicalPlanLog contains a non-partitioned physical plan like set storage 
group. */
-public class PhysicalPlanLog extends Log {
+/** RequestLog contains a non-partitioned request like set storage group. */
+public class RequestLog extends Log {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(PhysicalPlanLog.class);
-  private PhysicalPlan plan;
+  private static final Logger logger = 
LoggerFactory.getLogger(RequestLog.class);
+  private IConsensusRequest request;
 
-  public PhysicalPlanLog() {}
+  public RequestLog() {}
 
-  public PhysicalPlanLog(PhysicalPlan plan) {
-    this.plan = plan;
+  public RequestLog(IConsensusRequest request) {
+    this.request = request;
   }
 
   @Override
   public int getDefaultBufferSize() {
-    if (plan instanceof DummyPlan) {
+    if (request instanceof DummyPlan) {
       int workloadSize =
-          ((DummyPlan) plan).getWorkload() == null ? 0 : ((DummyPlan) 
plan).getWorkload().length;
+          ((DummyPlan) request).getWorkload() == null ? 0 : ((DummyPlan) 
request).getWorkload().length;
       return workloadSize + 512;
     }
     return DEFAULT_BUFFER_SIZE;
@@ -66,7 +67,9 @@ public class PhysicalPlanLog extends Log {
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
 
-      plan.serialize(dataOutputStream);
+      ByteBuffer byteBuffer = request.serializeToByteBuffer();
+      dataOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(),
+          byteBuffer.limit() - byteBuffer.position());
     } catch (IOException e) {
       // unreachable
     }
@@ -79,7 +82,7 @@ public class PhysicalPlanLog extends Log {
     buffer.put((byte) PHYSICAL_PLAN.ordinal());
     buffer.putLong(getCurrLogIndex());
     buffer.putLong(getCurrLogTerm());
-    plan.serialize(buffer);
+    buffer.put(request.serializeToByteBuffer());
   }
 
   @Override
@@ -88,7 +91,7 @@ public class PhysicalPlanLog extends Log {
     setCurrLogTerm(buffer.getLong());
 
     try {
-      plan = PhysicalPlan.Factory.create(buffer);
+      request = PhysicalPlan.Factory.create(buffer);
     } catch (IOException | IllegalPathException e) {
       logger.error(
           "Cannot parse a physical {}:{} plan {}",
@@ -99,17 +102,17 @@ public class PhysicalPlanLog extends Log {
     }
   }
 
-  public PhysicalPlan getPlan() {
-    return plan;
+  public IConsensusRequest getRequest() {
+    return request;
   }
 
-  public void setPlan(PhysicalPlan plan) {
-    this.plan = plan;
+  public void setRequest(IConsensusRequest request) {
+    this.request = request;
   }
 
   @Override
   public String toString() {
-    return plan + ",term:" + getCurrLogTerm() + ",index:" + getCurrLogIndex();
+    return request + ",term:" + getCurrLogTerm() + ",index:" + 
getCurrLogIndex();
   }
 
   @Override
@@ -123,12 +126,12 @@ public class PhysicalPlanLog extends Log {
     if (!super.equals(o)) {
       return false;
     }
-    PhysicalPlanLog that = (PhysicalPlanLog) o;
-    return Objects.equals(plan, that.plan);
+    RequestLog that = (RequestLog) o;
+    return Objects.equals(request, that.request);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), plan);
+    return Objects.hash(super.hashCode(), request);
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 5b0cfc85b1..b3fdb6d0af 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -104,8 +104,8 @@ public class AsynchronousSequencer implements LogSequencer {
         log.setSequenceStartTime(sequenceStartTime);
         log.setCurrLogTerm(member.getTerm().get());
         log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-        if (log instanceof PhysicalPlanLog) {
-          ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
+        if (log instanceof RequestLog) {
+          ((RequestLog) log).getRequest().setIndex(log.getCurrLogIndex());
         }
 
         startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index ddb96d18be..bbaf8770ce 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -70,9 +70,9 @@ public class SynchronousSequencer implements LogSequencer {
 
           // if the log contains a physical plan which is not a LogPlan, 
assign the same index to
           // the plan so the state machine can be bridged with the consensus
-          if (log instanceof PhysicalPlanLog
-              && !(((PhysicalPlanLog) log).getPlan() instanceof LogPlan)) {
-            ((PhysicalPlanLog) 
log).getPlan().setIndex(logManager.getLastLogIndex() + 1);
+          if (log instanceof RequestLog
+              && !(((RequestLog) log).getRequest() instanceof LogPlan)) {
+            ((RequestLog) 
log).getRequest().setIndex(logManager.getLastLogIndex() + 1);
           }
           log.setCurrLogTerm(member.getTerm().get());
           log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
index 0ca430d12e..44b317d505 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.server.handlers.forwarder;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -34,13 +34,13 @@ import java.util.concurrent.atomic.AtomicReference;
 public class ForwardPlanHandler implements AsyncMethodCallback<TSStatus> {
 
   private static final Logger logger = 
LoggerFactory.getLogger(ForwardPlanHandler.class);
-  private PhysicalPlan plan;
+  private IConsensusRequest request;
   private AtomicReference<TSStatus> result;
   private Node node;
 
-  public ForwardPlanHandler(AtomicReference<TSStatus> result, PhysicalPlan 
plan, Node node) {
+  public ForwardPlanHandler(AtomicReference<TSStatus> result, 
IConsensusRequest request, Node node) {
     this.result = result;
-    this.plan = plan;
+    this.request = request;
     this.node = node;
   }
 
@@ -55,9 +55,9 @@ public class ForwardPlanHandler implements 
AsyncMethodCallback<TSStatus> {
   @Override
   public void onError(Exception exception) {
     if (exception instanceof IOException) {
-      logger.warn("Cannot send plan {} to node {}: {}", plan, node, 
exception.getMessage());
+      logger.warn("Cannot send plan {} to node {}: {}", request, node, 
exception.getMessage());
     } else {
-      logger.error("Cannot send plan {} to node {}", plan, node, exception);
+      logger.error("Cannot send plan {} to node {}", request, node, exception);
     }
     synchronized (result) {
       TSStatus status = StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, 
exception.getMessage());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 6e03281060..173b7bcc9c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -72,15 +72,20 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion.TimePartitionFilter;
@@ -186,6 +191,7 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
             + "-raftId-"
             + nodes.getRaftId()
             + "";
+    groupId = new DataRegionId(0);
     setThisNode(thisNode);
     setAllNodes(nodes);
     mbeanName =
@@ -218,6 +224,7 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
         new ClientManager(
             ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
             ClientManager.Type.DataGroupClient));
+    groupId = new DataRegionId(nodes.getHeader().node.nodeIdentifier + 
nodes.getHeader().raftId);
     this.metaGroupMember = metaGroupMember;
     setThisNode(thisNode);
     setAllNodes(nodes);
@@ -694,65 +701,66 @@ public class DataGroupMember extends RaftMember 
implements DataGroupMemberMBean
    * Execute a non-query plan. If the member is a leader, a log for the plan 
will be created and
    * process through the raft procedure, otherwise the plan will be forwarded 
to the leader.
    *
-   * @param plan a non-query plan.
+   * @param request a non-query plan.
    */
   @Override
-  public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+  public ConsensusWriteResponse executeRequest(IConsensusRequest request) {
     if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) {
       try {
-        if (plan instanceof LogPlan) {
+        if (request instanceof LogPlan) {
           Log log;
           try {
-            log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+            log = LogParser.getINSTANCE().parse(((LogPlan) request).getLog());
           } catch (UnknownLogTypeException e) {
-            logger.error("Can not parse LogPlan {}", plan, e);
-            return StatusUtils.PARSE_LOG_ERROR;
+            logger.error("Can not parse LogPlan {}", request, e);
+            return new ConsensusWriteResponse(null, 
StatusUtils.PARSE_LOG_ERROR);
           }
           handleChangeMembershipLogWithoutRaft(log);
         } else {
-          ((DataLogApplier) dataLogApplier).applyPhysicalPlan(plan);
+          ((DataLogApplier) dataLogApplier).applyRequest(request);
         }
-        return StatusUtils.OK;
+        return new ConsensusWriteResponse(null, StatusUtils.OK);
       } catch (Exception e) {
         Throwable cause = IOUtils.getRootCause(e);
         boolean hasCreated = false;
         try {
-          if (plan instanceof InsertPlan
+          if (request instanceof InsertPlan
               && 
ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
-            if (plan instanceof InsertRowsPlan || plan instanceof 
InsertMultiTabletsPlan) {
+            if (request instanceof InsertRowsPlan || request instanceof 
InsertMultiTabletsPlan) {
               if (e instanceof BatchProcessException) {
                 for (TSStatus status : ((BatchProcessException) 
e).getFailingStatus()) {
                   if (status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
-                    hasCreated = 
createTimeseriesForFailedInsertion(((InsertPlan) plan));
-                    ((BatchPlan) plan).getResults().clear();
+                    hasCreated = 
createTimeseriesForFailedInsertion(((InsertPlan) request));
+                    ((BatchPlan) request).getResults().clear();
                     break;
                   }
                 }
               }
             } else if (cause instanceof PathNotExistException) {
-              hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) 
plan));
+              hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) 
request));
             }
           }
         } catch (MetadataException | CheckConsistencyException ex) {
-          logger.error("{}: Cannot auto-create timeseries for {}", name, plan, 
e);
-          return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
ex.getMessage());
+          logger.error("{}: Cannot auto-create timeseries for {}", name, 
request, e);
+          return new ConsensusWriteResponse(null,
+              StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
ex.getMessage()));
         }
         if (hasCreated) {
-          return executeNonQueryPlan(plan);
+          return executeRequest(request);
         }
-        return handleLogExecutionException(plan, cause);
+        return new ConsensusWriteResponse(null, 
handleLogExecutionException(request, cause));
       }
     } else {
-      TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
+      TSStatus status = executeNonQueryPlanWithKnownLeader(request);
       if (!StatusUtils.NO_LEADER.equals(status)) {
-        return status;
+        return new ConsensusWriteResponse(null, status);
       }
 
       long startTime = 
Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
       waitLeader();
       
Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
 
-      return executeNonQueryPlanWithKnownLeader(plan);
+      return new ConsensusWriteResponse(null, 
executeNonQueryPlanWithKnownLeader(request));
     }
   }
 
@@ -788,42 +796,44 @@ public class DataGroupMember extends RaftMember 
implements DataGroupMemberMBean
     }
   }
 
-  private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
+  private TSStatus executeNonQueryPlanWithKnownLeader(IConsensusRequest 
request) {
     if (character == NodeCharacter.LEADER) {
-      if (plan.getTargetedTerm() > 0 && plan.getTargetedTerm() != term.get()) {
+      if ((request instanceof PhysicalPlan)
+          && ((PhysicalPlan) request).getTargetedTerm() > 0
+          && ((PhysicalPlan) request).getTargetedTerm() != term.get()) {
         return StatusUtils.getStatus(TSStatusCode.LEADER_CHANGED)
             .setMessage(getRaftGroupFullId() + "-" + term.get());
       }
 
       long startTime = 
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.getOperationStartTime();
-      TSStatus status = processPlanLocally(plan);
+      TSStatus status = processPlanLocally(request);
       boolean hasCreated = false;
       try {
-        if (plan instanceof InsertPlan
+        if (request instanceof InsertPlan
             && 
ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
-          if (plan instanceof InsertRowsPlan || plan instanceof 
InsertMultiTabletsPlan) {
+          if (request instanceof InsertRowsPlan || request instanceof 
InsertMultiTabletsPlan) {
             if (status.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
               for (TSStatus tmpStatus : status.getSubStatus()) {
                 if (tmpStatus.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
-                  hasCreated = 
createTimeseriesForFailedInsertion(((InsertPlan) plan));
-                  ((BatchPlan) plan).getResults().clear();
+                  hasCreated = 
createTimeseriesForFailedInsertion(((InsertPlan) request));
+                  ((BatchPlan) request).getResults().clear();
                   break;
                 }
               }
             }
           } else {
             if (status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
-              hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) 
plan));
+              hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) 
request));
             }
           }
         }
       } catch (MetadataException | CheckConsistencyException e) {
-        logger.error("{}: Cannot auto-create timeseries for {}", name, plan, 
e);
+        logger.error("{}: Cannot auto-create timeseries for {}", name, 
request, e);
         return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
       }
 
       if (hasCreated) {
-        status = processPlanLocally(plan);
+        status = processPlanLocally(request);
       }
       
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.calOperationCostTimeFromStart(startTime);
       if (status != null) {
@@ -831,7 +841,7 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
       }
     } else if (leader.get() != null && 
!ClusterConstant.EMPTY_NODE.equals(leader.get())) {
       long startTime = 
Timer.Statistic.DATA_GROUP_MEMBER_FORWARD_PLAN.getOperationStartTime();
-      TSStatus result = forwardPlan(plan, leader.get(), getHeader());
+      TSStatus result = forwardPlan(request, leader.get(), getHeader());
       
Timer.Statistic.DATA_GROUP_MEMBER_FORWARD_PLAN.calOperationCostTimeFromStart(startTime);
       if (!StatusUtils.NO_LEADER.equals(result)) {
         result.setRedirectNode(
@@ -1161,4 +1171,9 @@ public class DataGroupMember extends RaftMember 
implements DataGroupMemberMBean
       this.version = version;
     }
   }
+
+  @Override
+  public int getPort(Node node) {
+    return node.getDataPort();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 65c767b0de..d2931ed34d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -76,11 +76,14 @@ import 
org.apache.iotdb.cluster.utils.nodetool.function.Status;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -206,7 +209,9 @@ public class MetaGroupMember extends RaftMember implements 
IService, MetaGroupMe
   boolean ready = false;
 
   @TestOnly
-  public MetaGroupMember() {}
+  public MetaGroupMember() {
+    groupId = new PartitionRegionId(0);
+  }
 
   public MetaGroupMember(Node thisNode, Coordinator coordinator) {
     super(
@@ -214,6 +219,7 @@ public class MetaGroupMember extends RaftMember implements 
IService, MetaGroupMe
         new ClientManager(
             ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
             ClientManager.Type.MetaGroupClient));
+    groupId = new PartitionRegionId(0);
     setThisNode(thisNode);
     setAllNodes(new PartitionGroup());
     initPeerMap();
@@ -1308,7 +1314,7 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
    * @param plan a non-query plan.
    */
   @Override
-  public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+  public ConsensusWriteResponse executeRequest(IConsensusRequest plan) {
     TSStatus result;
     long startTime = 
Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.getOperationStartTime();
     if (PartitionUtils.isGlobalMetaPlan(plan)) {
@@ -1322,7 +1328,7 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
       result = coordinator.executeNonQueryPlan(plan);
     }
     
Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
-    return result;
+    return new ConsensusWriteResponse(null, result);
   }
 
   @Override
@@ -1341,7 +1347,7 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
    * Thus the plan will be processed locally only by the MetaLeader and 
forwarded by non-leader
    * nodes.
    */
-  public TSStatus processNonPartitionedMetaPlan(PhysicalPlan plan) {
+  public TSStatus processNonPartitionedMetaPlan(IConsensusRequest plan) {
     if (character == NodeCharacter.LEADER) {
       TSStatus status = processPlanLocally(plan);
       if (status != null) {
@@ -1926,4 +1932,9 @@ public class MetaGroupMember extends RaftMember 
implements IService, MetaGroupMe
   public String getIdNodeMapAsString() {
     return idNodeMap.toString();
   }
+
+  @Override
+  public int getPort(Node node) {
+    return node.getMetaPort();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 1debf68cb5..0b06c31af5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.cluster.log.appender.LogAppender;
 import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
 import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
@@ -81,13 +81,17 @@ import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
@@ -183,6 +187,7 @@ public abstract class RaftMember implements RaftMemberMBean 
{
 
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
   /** the name of the member, to distinguish several members in the logs. */
+  ConsensusGroupId groupId;
   String name;
   /** to choose nodes to send request of joining cluster randomly. */
   Random random = new Random();
@@ -852,28 +857,13 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     }
   }
 
-  /**
-   * If the node is not a leader, the request will be sent to the leader or 
reports an error if
-   * there is no leader. Otherwise execute the plan locally (whether to send 
it to followers depends
-   * on the type of the plan).
-   */
-  public TSStatus executeNonQueryPlan(ExecutNonQueryReq request)
-      throws IOException, IllegalPathException {
-    // process the plan locally
-    PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
-
-    TSStatus answer = executeNonQueryPlan(plan);
-    logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, 
answer);
-    return answer;
-  }
-
   /**
    * Execute a non-query plan. Subclass may have their individual implements.
    *
    * @param plan a non-query plan.
    * @return A TSStatus indicating the execution result.
    */
-  protected abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
+  public abstract ConsensusWriteResponse executeRequest(IConsensusRequest 
plan);
 
   abstract ClientCategory getClientCategory();
 
@@ -1109,29 +1099,29 @@ public abstract class RaftMember implements 
RaftMemberMBean {
    * @return OK if over half of the followers accept the log or null if the 
leadership is lost
    *     during the appending
    */
-  public TSStatus processPlanLocally(PhysicalPlan plan) {
+  public TSStatus processPlanLocally(IConsensusRequest request) {
     if (USE_LOG_DISPATCHER) {
-      return processPlanLocallyV2(plan);
+      return processPlanLocallyV2(request);
     }
 
-    logger.debug("{}: Processing plan {}", name, plan);
-    if (readOnly && !(plan instanceof LogPlan)) {
+    logger.debug("{}: Processing plan {}", name, request);
+    if (readOnly && !(request instanceof LogPlan)) {
       return StatusUtils.NODE_READ_ONLY;
     }
     long startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
 
     Log log;
 
-    if (plan instanceof LogPlan) {
+    if (request instanceof LogPlan) {
       try {
-        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+        log = LogParser.getINSTANCE().parse(((LogPlan) request).getLog());
       } catch (UnknownLogTypeException e) {
-        logger.error("Can not parse LogPlan {}", plan, e);
+        logger.error("Can not parse LogPlan {}", request, e);
         return StatusUtils.PARSE_LOG_ERROR;
       }
     } else {
-      log = new PhysicalPlanLog();
-      ((PhysicalPlanLog) log).setPlan(plan);
+      log = new RequestLog();
+      ((RequestLog) log).setRequest(request);
     }
 
     // if a single log exceeds the threshold
@@ -1152,8 +1142,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
       synchronized (logManager) {
         if (logManager.getLastLogIndex() - logManager.getCommitLogIndex()
             <= config.getUnCommittedRaftLogNumForRejectThreshold()) {
-          if (!(plan instanceof LogPlan)) {
-            plan.setIndex(logManager.getLastLogIndex() + 1);
+          if (!(request instanceof LogPlan) && (request instanceof 
PhysicalPlan)) {
+            ((PhysicalPlan) request).setIndex(logManager.getLastLogIndex() + 
1);
           }
           log.setCurrLogTerm(getTerm().get());
           log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
@@ -1187,7 +1177,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     }
   }
 
-  protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+  protected TSStatus processPlanLocallyV2(IConsensusRequest plan) {
     long totalStartTime = System.nanoTime();
     logger.debug("{}: Processing plan {}", name, plan);
     if (readOnly) {
@@ -1203,8 +1193,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
         return StatusUtils.PARSE_LOG_ERROR;
       }
     } else {
-      log = new PhysicalPlanLog();
-      ((PhysicalPlanLog) log).setPlan(plan);
+      log = new RequestLog();
+      ((RequestLog) log).setRequest(plan);
     }
 
     if (USE_CRAFT && allNodes.size() > 2) {
@@ -1473,7 +1463,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
    *     communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
-  public TSStatus forwardPlan(PhysicalPlan plan, Node node, RaftNode header) {
+  public TSStatus forwardPlan(IConsensusRequest plan, Node node, RaftNode 
header) {
     if (node == null || node.equals(thisNode)) {
       logger.debug("{}: plan {} has no where to be forwarded", name, plan);
       return StatusUtils.NO_LEADER;
@@ -1505,7 +1495,7 @@ public abstract class RaftMember implements 
RaftMemberMBean {
    * @param header to determine which DataGroupMember of "receiver" will 
process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
-  private TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, RaftNode 
header) {
+  private TSStatus forwardPlanAsync(IConsensusRequest plan, Node receiver, 
RaftNode header) {
     AsyncClient client = getAsyncClient(receiver);
     if (client == null) {
       logger.debug("{}: can not get client for node={}", name, receiver);
@@ -1517,38 +1507,38 @@ public abstract class RaftMember implements 
RaftMemberMBean {
   }
 
   public TSStatus forwardPlanAsync(
-      PhysicalPlan plan, Node receiver, RaftNode header, AsyncClient client) {
+      IConsensusRequest request, Node receiver, RaftNode header, AsyncClient 
client) {
     try {
-      TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, 
header, receiver);
+      TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, request, 
header, receiver);
       if (tsStatus == null) {
         tsStatus = StatusUtils.TIME_OUT;
-        logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+        logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
       }
       return tsStatus;
     } catch (IOException | TException e) {
-      logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
+      logger.error(MSG_FORWARD_ERROR, name, request, receiver, e);
       return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      logger.warn("{}: forward {} to {} interrupted", name, plan, receiver);
+      logger.warn("{}: forward {} to {} interrupted", name, request, receiver);
       return StatusUtils.TIME_OUT;
     }
   }
 
-  private TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, RaftNode 
header) {
+  private TSStatus forwardPlanSync(IConsensusRequest request, Node receiver, 
RaftNode header) {
     Client client = getSyncClient(receiver);
     if (client == null) {
-      logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+      logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
       return StatusUtils.TIME_OUT;
     }
-    return forwardPlanSync(plan, receiver, header, client);
+    return forwardPlanSync(request, receiver, header, client);
   }
 
   public TSStatus forwardPlanSync(
-      PhysicalPlan plan, Node receiver, RaftNode header, Client client) {
+      IConsensusRequest request, Node receiver, RaftNode header, Client 
client) {
     try {
       ExecutNonQueryReq req = new ExecutNonQueryReq();
-      req.setPlanBytes(PlanSerializer.getInstance().serialize(plan));
+      req.setPlanBytes(request.serializeToByteBuffer());
       if (header != null) {
         req.setHeader(header);
       }
@@ -1556,19 +1546,16 @@ public abstract class RaftMember implements 
RaftMemberMBean {
       TSStatus tsStatus = client.executeNonQueryPlan(req);
       if (tsStatus == null) {
         tsStatus = StatusUtils.TIME_OUT;
-        logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+        logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
       }
       return tsStatus;
-    } catch (IOException e) {
-      logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
-      return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
     } catch (TException e) {
       TSStatus status;
       if (e.getCause() instanceof SocketTimeoutException) {
         status = StatusUtils.TIME_OUT;
-        logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+        logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
       } else {
-        logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
+        logger.error(MSG_FORWARD_ERROR, name, request, receiver, e);
         status = StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, 
e.getMessage());
       }
       // the connection may be broken, close it to avoid it being reused
@@ -1694,12 +1681,12 @@ public abstract class RaftMember implements 
RaftMemberMBean {
   }
 
   private boolean canBeWeaklyAccepted(Log log) {
-    if (!(log instanceof PhysicalPlanLog)) {
+    if (!(log instanceof RequestLog)) {
       return false;
     }
-    PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
-    return physicalPlanLog.getPlan() instanceof InsertPlan
-        || physicalPlanLog.getPlan() instanceof DummyPlan;
+    RequestLog requestLog = (RequestLog) log;
+    return requestLog.getRequest() instanceof InsertPlan
+        || requestLog.getRequest() instanceof DummyPlan;
   }
 
   /**
@@ -2380,4 +2367,25 @@ public abstract class RaftMember implements 
RaftMemberMBean {
   public LogRelay getLogRelay() {
     return logRelay;
   }
+
+  public ConsensusGroupId getGroupId() {
+    return groupId;
+  }
+
+  public abstract int getPort(Node node);
+
+  public Peer getThisPeer() {
+    return new Peer(groupId, new TEndPoint(thisNode.internalIp, 
getPort(thisNode)));
+  }
+
+  public Peer getLeaderPeer() {
+    if (leader.get() == null) {
+      return null;
+    }
+    return new Peer(groupId, new TEndPoint(leader.get().internalIp, 
getPort(leader.get())));
+  }
+
+  public boolean isLeader() {
+    return ClusterUtils.nodeEqual(leader.get(), thisNode);
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index bb87809543..2a83adc28e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 
@@ -46,9 +47,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class BaseAsyncService implements RaftService.AsyncIface {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(BaseAsyncService.class);
+
   RaftMember member;
   String name;
 
@@ -166,7 +171,11 @@ public abstract class BaseAsyncService implements 
RaftService.AsyncIface {
     }
 
     try {
-      TSStatus status = member.executeNonQueryPlan(request);
+      // process the plan locally
+      PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
+
+      TSStatus status = member.executeRequest(plan);
+      logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, 
status);
       resultHandler.onComplete(
           StatusUtils.getStatus(
               status,
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index a438d3cb54..8119eceda5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,7 +154,12 @@ public abstract class BaseSyncService implements 
RaftService.Iface {
   @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws 
TException {
     try {
-      return member.executeNonQueryPlan(request);
+      // process the plan locally
+      PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
+
+      TSStatus answer = member.executeRequest(plan);
+      logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, 
answer);
+      return answer;
     } catch (Exception e) {
       throw new TException(e);
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index b8e452657c..55761cd1eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.utils;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -69,7 +70,7 @@ public class PartitionUtils {
    * @param plan
    * @return
    */
-  public static boolean isLocalNonQueryPlan(PhysicalPlan plan) {
+  public static boolean isLocalNonQueryPlan(IConsensusRequest plan) {
     return plan instanceof LoadDataPlan
         || plan instanceof OperateFilePlan
         || plan instanceof KillQueryPlan
@@ -85,7 +86,7 @@ public class PartitionUtils {
    * @param plan
    * @return
    */
-  public static boolean isGlobalMetaPlan(PhysicalPlan plan) {
+  public static boolean isGlobalMetaPlan(IConsensusRequest plan) {
     return plan instanceof SetStorageGroupPlan
         || plan instanceof SetTTLPlan
         || plan instanceof ShowTTLPlan
@@ -113,7 +114,7 @@ public class PartitionUtils {
    * @param plan the plan to check
    * @return is globalDataPlan or not
    */
-  public static boolean isGlobalDataPlan(PhysicalPlan plan) {
+  public static boolean isGlobalDataPlan(IConsensusRequest plan) {
     return
     // because deletePlan has an infinite time range.
     plan instanceof DeletePlan
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java
index 56ca05c2ef..663164dcf4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.common;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -37,9 +37,9 @@ public class TestLogApplier implements LogApplier {
   @Override
   public void apply(Log log) {
     try {
-      if (log instanceof PhysicalPlanLog) {
-        PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
-        getPlanExecutor().processNonQuery(physicalPlanLog.getPlan());
+      if (log instanceof RequestLog) {
+        RequestLog requestLog = (RequestLog) log;
+        getPlanExecutor().processNonQuery(requestLog.getRequest());
       } else if (log instanceof CloseFileLog) {
         CloseFileLog closeFileLog = ((CloseFileLog) log);
         try {
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 6969448090..768ea092ca 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -24,7 +24,7 @@ import 
org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -60,10 +60,10 @@ public class LogParserTest {
 
   @Test
   public void testPhysicalPlanLog() throws UnknownLogTypeException, 
IllegalPathException {
-    PhysicalPlanLog log = new PhysicalPlanLog();
+    RequestLog log = new RequestLog();
     SetStorageGroupPlan setStorageGroupPlan =
         new SetStorageGroupPlan(new PartialPath(TestUtils.getTestSg(5)));
-    log.setPlan(setStorageGroupPlan);
+    log.setRequest(setStorageGroupPlan);
     log.setCurrLogIndex(8);
     log.setCurrLogTerm(8);
 
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
index af86c48b57..2866927b4f 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -68,9 +68,9 @@ public class AsyncDataLogApplierTest {
   public void test() throws IllegalPathException, InterruptedException {
     LogApplier dummyApplier =
         log -> {
-          if (log instanceof PhysicalPlanLog) {
-            PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
-            PhysicalPlan plan = physicalPlanLog.getPlan();
+          if (log instanceof RequestLog) {
+            RequestLog requestLog = (RequestLog) log;
+            PhysicalPlan plan = requestLog.getRequest();
             if (plan instanceof InsertRowPlan) {
               appliedLogs.add(log);
               log.setApplied(true);
@@ -89,7 +89,7 @@ public class AsyncDataLogApplierTest {
         PhysicalPlan plan =
             new InsertRowPlan(
                 new PartialPath(TestUtils.getTestSg(i)), i, new String[0], new 
String[0]);
-        PhysicalPlanLog log = new PhysicalPlanLog(plan);
+        RequestLog log = new RequestLog(plan);
         log.setCurrLogIndex(i);
         logsToApply.add(log);
       }
@@ -116,9 +116,9 @@ public class AsyncDataLogApplierTest {
   public void testParallel() {
     LogApplier dummyApplier =
         log -> {
-          if (log instanceof PhysicalPlanLog) {
-            PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
-            PhysicalPlan plan = physicalPlanLog.getPlan();
+          if (log instanceof RequestLog) {
+            RequestLog requestLog = (RequestLog) log;
+            PhysicalPlan plan = requestLog.getRequest();
             if (plan instanceof InsertRowPlan) {
               appliedLogs.add(log);
               log.setApplied(true);
@@ -149,7 +149,7 @@ public class AsyncDataLogApplierTest {
                     } catch (IllegalPathException e) {
                       // ignore
                     }
-                    PhysicalPlanLog log = new PhysicalPlanLog(plan);
+                    RequestLog log = new RequestLog(plan);
                     log.setCurrLogIndex(finalI * 11 + j);
                     threadLogsToApply.add(log);
                   }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 8e739462e7..728ce7dc20 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.metadata.CSchemaProcessor;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -301,8 +301,8 @@ public class DataLogApplierTest extends IoTDBTest {
       throws QueryProcessException, IOException, 
QueryFilterOptimizationException,
           StorageEngineException, MetadataException, InterruptedException {
     InsertRowPlan insertPlan = new InsertRowPlan();
-    PhysicalPlanLog log = new PhysicalPlanLog();
-    log.setPlan(insertPlan);
+    RequestLog log = new RequestLog();
+    log.setRequest(insertPlan);
 
     // this series is already created
     insertPlan.setDevicePath(new PartialPath(TestUtils.getTestSg(1)));
@@ -357,8 +357,8 @@ public class DataLogApplierTest extends IoTDBTest {
       throws MetadataException, QueryProcessException, StorageEngineException, 
IOException,
           InterruptedException, QueryFilterOptimizationException {
     InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
-    PhysicalPlanLog log = new PhysicalPlanLog();
-    log.setPlan(insertRowsPlan);
+    RequestLog log = new RequestLog();
+    log.setRequest(insertRowsPlan);
 
     for (int i = 1; i <= 4; i++) {
       InsertRowPlan insertPlan = new InsertRowPlan();
@@ -394,7 +394,7 @@ public class DataLogApplierTest extends IoTDBTest {
     DeletePlan deletePlan = new DeletePlan();
     deletePlan.setPaths(Collections.singletonList(new 
PartialPath(TestUtils.getTestSeries(0, 0))));
     deletePlan.setDeleteEndTime(50);
-    applier.apply(new PhysicalPlanLog(deletePlan));
+    applier.apply(new RequestLog(deletePlan));
     QueryDataSet dataSet = 
query(Collections.singletonList(TestUtils.getTestSeries(0, 0)), null);
     int cnt = 0;
     while (dataSet.hasNext()) {
@@ -422,7 +422,7 @@ public class DataLogApplierTest extends IoTDBTest {
     // existing sg
     FlushPlan flushPlan =
         new FlushPlan(null, Collections.singletonList(new 
PartialPath(TestUtils.getTestSg(0))));
-    PhysicalPlanLog log = new PhysicalPlanLog(flushPlan);
+    RequestLog log = new RequestLog(flushPlan);
 
     applier.apply(log);
     assertNull(log.getException());
@@ -430,7 +430,7 @@ public class DataLogApplierTest extends IoTDBTest {
     // non-existing sg
     flushPlan =
         new FlushPlan(null, Collections.singletonList(new 
PartialPath(TestUtils.getTestSg(20))));
-    log = new PhysicalPlanLog(flushPlan);
+    log = new RequestLog(flushPlan);
 
     applier.apply(log);
     assertEquals(
@@ -453,7 +453,7 @@ public class DataLogApplierTest extends IoTDBTest {
     multiTimeSeriesPlan.setDataTypes(Arrays.asList(TSDataType.DOUBLE, 
TSDataType.DOUBLE));
     multiTimeSeriesPlan.setEncodings(Arrays.asList(TSEncoding.GORILLA, 
TSEncoding.GORILLA));
 
-    PhysicalPlanLog log = new PhysicalPlanLog(multiTimeSeriesPlan);
+    RequestLog log = new RequestLog(multiTimeSeriesPlan);
     // the applier should sync meta leader to get root.sg2 and report no error
     applier.apply(log);
     assertTrue(
@@ -477,7 +477,7 @@ public class DataLogApplierTest extends IoTDBTest {
         });
 
     DeletePlan deletePlan = new DeletePlan();
-    PhysicalPlanLog log = new PhysicalPlanLog(deletePlan);
+    RequestLog log = new RequestLog(deletePlan);
     applier.apply(log);
     assertNull(log.getException());
   }
@@ -485,16 +485,16 @@ public class DataLogApplierTest extends IoTDBTest {
   @Test
   public void testApplyClearCache() {
     ClearCachePlan clearCachePlan = new ClearCachePlan();
-    PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog(clearCachePlan);
-    applier.apply(physicalPlanLog);
-    assertNull(physicalPlanLog.getException());
+    RequestLog requestLog = new RequestLog(clearCachePlan);
+    applier.apply(requestLog);
+    assertNull(requestLog.getException());
   }
 
   @Test
   public void testApplyMerge() {
     MergePlan mergePlan = new MergePlan();
-    PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog(mergePlan);
-    applier.apply(physicalPlanLog);
-    assertNull(physicalPlanLog.getException());
+    RequestLog requestLog = new RequestLog(mergePlan);
+    applier.apply(requestLog);
+    assertNull(requestLog.getException());
   }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index ff50b10463..d8ebb4f093 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.Constants;
@@ -106,12 +106,12 @@ public class MetaLogApplierTest extends IoTDBTest {
 
   @Test
   public void testApplyMetadataCreation() throws MetadataException {
-    PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog();
+    RequestLog requestLog = new RequestLog();
     SetStorageGroupPlan setStorageGroupPlan =
         new SetStorageGroupPlan(new PartialPath("root.applyMeta"));
-    physicalPlanLog.setPlan(setStorageGroupPlan);
+    requestLog.setRequest(setStorageGroupPlan);
 
-    applier.apply(physicalPlanLog);
+    applier.apply(requestLog);
     assertTrue(IoTDB.schemaProcessor.isPathExist(new 
PartialPath("root.applyMeta")));
 
     CreateTimeSeriesPlan createTimeSeriesPlan =
@@ -124,8 +124,8 @@ public class MetaLogApplierTest extends IoTDBTest {
             Collections.emptyMap(),
             Collections.emptyMap(),
             null);
-    physicalPlanLog.setPlan(createTimeSeriesPlan);
-    applier.apply(physicalPlanLog);
+    requestLog.setRequest(createTimeSeriesPlan);
+    applier.apply(requestLog);
     assertTrue(IoTDB.schemaProcessor.isPathExist(new 
PartialPath("root.applyMeta.s1")));
     assertEquals(
         TSDataType.DOUBLE,
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 35aae54f2d..0d6832b964 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -48,7 +48,7 @@ public class SerializeLogTest {
 
   @Test
   public void testPhysicalPlanLog() throws UnknownLogTypeException, 
IllegalPathException {
-    PhysicalPlanLog log = new PhysicalPlanLog();
+    RequestLog log = new RequestLog();
     log.setCurrLogIndex(2);
     log.setCurrLogTerm(2);
     InsertRowPlan plan = new InsertRowPlan();
@@ -68,19 +68,19 @@ public class SerializeLogTest {
     schemas[2].getSchema().setType(TSDataType.TEXT);
     plan.setMeasurementMNodes(schemas);
     plan.setTime(1);
-    log.setPlan(plan);
+    log.setRequest(plan);
 
     ByteBuffer byteBuffer = log.serialize();
     Log logPrime = LogParser.getINSTANCE().parse(byteBuffer);
     assertEquals(log, logPrime);
 
-    log = new PhysicalPlanLog(new SetStorageGroupPlan(new 
PartialPath("root.sg1")));
+    log = new RequestLog(new SetStorageGroupPlan(new PartialPath("root.sg1")));
     byteBuffer = log.serialize();
     logPrime = LogParser.getINSTANCE().parse(byteBuffer);
     assertEquals(log, logPrime);
 
     log =
-        new PhysicalPlanLog(
+        new RequestLog(
             new CreateTimeSeriesPlan(
                 new PartialPath("root.applyMeta" + ".s1"),
                 TSDataType.DOUBLE,
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index ab3c41ba5f..64b167f3df 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -605,7 +605,7 @@ public class DataGroupMemberTest extends BaseMember {
             Collections.emptyMap(),
             Collections.emptyMap(),
             null);
-    assertEquals(200, 
dataGroupMember.executeNonQueryPlan(createTimeSeriesPlan).code);
+    assertEquals(200, 
dataGroupMember.executeRequest(createTimeSeriesPlan).code);
     assertTrue(IoTDB.schemaProcessor.isPathExist(new 
PartialPath(timeseriesSchema.getFullPath())));
   }
 
@@ -633,7 +633,7 @@ public class DataGroupMemberTest extends BaseMember {
         getLogManager(
             partitionTable.getPartitionGroup(new 
RaftNode(TestUtils.getNode(0), 0)),
             dataGroupMember));
-    assertEquals(200, 
dataGroupMember.executeNonQueryPlan(createTimeSeriesPlan).code);
+    assertEquals(200, 
dataGroupMember.executeRequest(createTimeSeriesPlan).code);
     assertTrue(IoTDB.schemaProcessor.isPathExist(new 
PartialPath(timeseriesSchema.getFullPath())));
     testThreadPool.shutdownNow();
   }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
index 37c9abba12..a1f2be8d2d 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogParser;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -116,7 +116,7 @@ public class SerializeUtilTest {
     tabletPlan.setColumns(columns);
     tabletPlan.setRowCount(times.length);
 
-    Log log = new PhysicalPlanLog(tabletPlan);
+    Log log = new RequestLog(tabletPlan);
     log.setCurrLogTerm(1);
     log.setCurrLogIndex(2);
 
@@ -177,7 +177,7 @@ public class SerializeUtilTest {
     plan.setAttributes(attributesList);
     plan.setAlias(alias);
 
-    Log log = new PhysicalPlanLog(plan);
+    Log log = new RequestLog(plan);
     log.setCurrLogTerm(1);
     log.setCurrLogIndex(2);
 

Reply via email to