This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e04af828e7 [IOTDB-3385] Reduce the serialization size for the Datanode 
consensus layer (#6209)
e04af828e7 is described below

commit e04af828e768ae3c7fb216366cca5a9e1642e0ff
Author: Potato <[email protected]>
AuthorDate: Thu Jun 9 20:42:45 2022 +0800

    [IOTDB-3385] Reduce the serialization size for the Datanode consensus layer 
(#6209)
---
 .../common/request/IndexedConsensusRequest.java    |  15 +--
 .../apache/iotdb/consensus/config/RatisConfig.java |   4 +-
 .../multileader/MultiLeaderServerImpl.java         |  13 +--
 .../multileader/logdispatcher/LogDispatcher.java   |   5 +-
 .../service/MultiLeaderRPCServiceProcessor.java    |   2 +-
 .../multileader/MultiLeaderConsensusTest.java      |   2 -
 .../consensus/statemachine/BaseStateMachine.java   |  15 +++
 .../statemachine/DataRegionStateMachine.java       |  13 +--
 .../statemachine/SchemaRegionStateMachine.java     |   4 +-
 .../plan/planner/plan/node/DeleteRegionNode.java   |  19 ----
 .../db/mpp/plan/planner/plan/node/PlanNode.java    |  26 ++++-
 .../plan/planner/plan/node/write/InsertNode.java   |  22 +----
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  34 ++++---
 .../service/thrift/impl/InternalServiceImpl.java   | 108 ++++++++++-----------
 .../iotdb/db/service/InternalServiceImplTest.java  |  89 ++++++-----------
 .../src/main/thrift/mutlileader.thrift             |   8 +-
 thrift/src/main/thrift/mpp.thrift                  |  23 ++++-
 17 files changed, 177 insertions(+), 225 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 073c0f4b64..3578fd5656 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.consensus.common.request;
 
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
-
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
@@ -32,16 +30,12 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
 
   private final long safelyDeletedSearchIndex;
 
-  /** we do not need to serialize this field as it will be serialized by 
TLogBatch. */
-  private final TLogType type;
-
   private final IConsensusRequest request;
 
   public IndexedConsensusRequest(
-      long searchIndex, long safelyDeletedSearchIndex, TLogType type, 
IConsensusRequest request) {
+      long searchIndex, long safelyDeletedSearchIndex, IConsensusRequest 
request) {
     this.searchIndex = searchIndex;
     this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
-    this.type = type;
     this.request = request;
   }
 
@@ -62,10 +56,6 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
     return safelyDeletedSearchIndex;
   }
 
-  public TLogType getType() {
-    return type;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -77,12 +67,11 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
     IndexedConsensusRequest that = (IndexedConsensusRequest) o;
     return searchIndex == that.searchIndex
         && safelyDeletedSearchIndex == that.safelyDeletedSearchIndex
-        && type == that.type
         && Objects.equals(request, that.request);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(searchIndex, safelyDeletedSearchIndex, type, request);
+    return Objects.hash(searchIndex, safelyDeletedSearchIndex, request);
   }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index 048fd21d76..c62e789f2b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -173,8 +173,8 @@ public class RatisConfig {
     }
 
     public static class Builder {
-      private TimeDuration timeoutMin = TimeDuration.valueOf(2, 
TimeUnit.SECONDS);
-      private TimeDuration timeoutMax = TimeDuration.valueOf(8, 
TimeUnit.SECONDS);
+      private TimeDuration timeoutMin = TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS);
+      private TimeDuration timeoutMax = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
       private TimeDuration requestTimeout = TimeDuration.valueOf(20, 
TimeUnit.SECONDS);
       private TimeDuration sleepTime = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
       private TimeDuration slownessTimeout = TimeDuration.valueOf(10, 
TimeUnit.MINUTES);
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index a999de531f..596e72bd53 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -29,7 +29,6 @@ import 
org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.ratis.Utils;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -153,19 +152,13 @@ public class MultiLeaderServerImpl {
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
     return new IndexedConsensusRequest(
-        controller.incrementAndGet(),
-        getCurrentSafelyDeletedSearchIndex(),
-        TLogType.FragmentInstance,
-        request);
+        controller.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(), 
request);
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
-      ByteBufferConsensusRequest request, TLogType type) {
+      ByteBufferConsensusRequest request) {
     return new IndexedConsensusRequest(
-        ConsensusReqReader.DEFAULT_SEARCH_INDEX,
-        getCurrentSafelyDeletedSearchIndex(),
-        type,
-        request);
+        ConsensusReqReader.DEFAULT_SEARCH_INDEX, 
getCurrentSafelyDeletedSearchIndex(), request);
   }
 
   /**
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 88e42fbb7f..8a6a94969e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClie
 import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
 import 
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
@@ -281,7 +280,7 @@ public class LogDispatcher {
         if (data != null) {
           // since WAL can no longer recover FragmentInstance, but only 
PlanNode, we need to give
           // special flags to use different deserialization methods in the 
dataRegion stateMachine
-          logBatches.add(new TLogBatch(TLogType.InsertNode, 
data.serializeToByteBuffer()));
+          logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
         }
       }
       return currentIndex - 1;
@@ -289,7 +288,7 @@ public class LogDispatcher {
 
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, List<TLogBatch> logBatches) {
-      logBatches.add(new TLogBatch(TLogType.FragmentInstance, 
request.serializeToByteBuffer()));
+      logBatches.add(new TLogBatch(request.serializeToByteBuffer()));
     }
   }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 05a2c4cb03..d829f403d0 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -71,7 +71,7 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
             impl.getStateMachine()
                 .write(
                     impl.buildIndexedConsensusRequestForRemoteRequest(
-                        new ByteBufferConsensusRequest(batch.data), 
batch.type)));
+                        new ByteBufferConsensusRequest(batch.data))));
       }
     }
     logger.debug("Execute TSyncLogReq for {} with result {}", 
req.consensusGroupId, statuses);
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index b03ffa8df0..fa86215466 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -33,7 +33,6 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -374,7 +373,6 @@ public class MultiLeaderConsensusTest {
             new IndexedConsensusRequest(
                 ((IndexedConsensusRequest) request).getSearchIndex(),
                 -1,
-                TLogType.FragmentInstance,
                 new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
       } else {
         requestSet.add(((IndexedConsensusRequest) request));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index e42e393788..6d8dc5f62b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,4 +45,17 @@ public abstract class BaseStateMachine implements 
IStateMachine, IStateMachine.E
     }
     return instance;
   }
+
+  protected PlanNode getPlanNode(IConsensusRequest request) {
+    PlanNode node;
+    if (request instanceof ByteBufferConsensusRequest) {
+      node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+    } else if (request instanceof PlanNode) {
+      node = (PlanNode) request;
+    } else {
+      logger.error("Unexpected IConsensusRequest : {}", request);
+      throw new IllegalArgumentException("Unexpected IConsensusRequest!");
+    }
+    return node;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 1cf07f56d4..ad33a65ae2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -34,7 +33,6 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -104,14 +102,7 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
     try {
       if (request instanceof IndexedConsensusRequest) {
         IndexedConsensusRequest indexedConsensusRequest = 
(IndexedConsensusRequest) request;
-        if (indexedConsensusRequest.getType() == TLogType.InsertNode) {
-          planNode =
-              PlanNodeType.deserialize(
-                  
indexedConsensusRequest.getRequest().serializeToByteBuffer());
-        } else {
-          planNode =
-              
getFragmentInstance(indexedConsensusRequest.getRequest()).getFragment().getRoot();
-        }
+        planNode = getPlanNode(indexedConsensusRequest.getRequest());
         if (planNode instanceof InsertNode) {
           ((InsertNode) planNode)
               .setSearchIndex(((IndexedConsensusRequest) 
request).getSearchIndex());
@@ -120,7 +111,7 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
                   ((IndexedConsensusRequest) 
request).getSafelyDeletedSearchIndex());
         }
       } else {
-        planNode = getFragmentInstance(request).getFragment().getRoot();
+        planNode = getPlanNode(request);
       }
       return write(planNode);
     } catch (IllegalArgumentException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 930093f7a6..93b2a1d06a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -26,7 +26,6 @@ import 
org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -66,8 +65,7 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
   public TSStatus write(IConsensusRequest request) {
     logger.info("Execute write plan in SchemaRegionStateMachine");
     try {
-      PlanNode planNode = getFragmentInstance(request).getFragment().getRoot();
-      return planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
+      return getPlanNode(request).accept(new SchemaExecutionVisitor(), 
schemaRegion);
     } catch (IllegalArgumentException e) {
       logger.error(e.getMessage(), e);
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
index 17f020c64d..ff1bb66f7e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
@@ -22,14 +22,9 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,8 +32,6 @@ import java.util.List;
 
 public class DeleteRegionNode extends WritePlanNode implements 
IConsensusRequest {
 
-  private final Logger logger = 
LoggerFactory.getLogger(DeleteRegionNode.class);
-
   protected ConsensusGroupId consensusGroupId;
 
   public DeleteRegionNode(PlanNodeId id) {
@@ -117,16 +110,4 @@ public class DeleteRegionNode extends WritePlanNode 
implements IConsensusRequest
     ReadWriteIOUtils.write(consensusGroupId.getType().getValue(), stream);
     ReadWriteIOUtils.write(consensusGroupId.getId(), stream);
   }
-
-  @Override
-  public ByteBuffer serializeToByteBuffer() {
-    try (PublicBAOS publicBAOS = new PublicBAOS();
-        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
-      super.serialize(outputStream);
-      return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
-    } catch (IOException e) {
-      logger.error("Unexpected error occurs when serializing this 
DeleteRegionNode.", e);
-      throw new SerializationRunTimeException(e);
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index 07f8746ee9..9b0d600cdb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -18,9 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,7 +36,10 @@ import java.util.Objects;
 import static java.util.Objects.requireNonNull;
 
 /** The base class of query logical plan nodes, which is used to compose 
logical query plan. */
-public abstract class PlanNode {
+public abstract class PlanNode implements IConsensusRequest {
+
+  private final Logger logger = LoggerFactory.getLogger(PlanNode.class);
+
   protected static final int NO_CHILD_ALLOWED = 0;
   protected static final int ONE_CHILD = 1;
   protected static final int CHILD_COUNT_NO_LIMIT = -1;
@@ -113,6 +121,22 @@ public abstract class PlanNode {
     }
   }
 
+  /**
+   * Deserialize via {@link
+   * 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)}
+   */
+  @Override
+  public ByteBuffer serializeToByteBuffer() {
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      serialize(outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    } catch (IOException e) {
+      logger.error("Unexpected error occurs when serializing writePlanNode.", 
e);
+      throw new SerializationRunTimeException(e);
+    }
+  }
+
   protected abstract void serializeAttributes(ByteBuffer byteBuffer);
 
   protected abstract void serializeAttributes(DataOutputStream stream) throws 
IOException;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index d880cf23ad..92dc6e6e63 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,10 +20,8 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -32,7 +30,6 @@ import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
@@ -49,7 +46,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public abstract class InsertNode extends WritePlanNode implements 
IConsensusRequest {
+public abstract class InsertNode extends WritePlanNode {
 
   private final Logger logger = LoggerFactory.getLogger(InsertNode.class);
   /** this insert node doesn't need to participate in multi-leader consensus */
@@ -179,23 +176,6 @@ public abstract class InsertNode extends WritePlanNode 
implements IConsensusRequ
     this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
   }
 
-  /**
-   * Deserialize via {@link
-   * 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)}
-   */
-  @Override
-  public ByteBuffer serializeToByteBuffer() {
-    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
-        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      serializeAttributes(outputStream);
-      getPlanNodeId().serialize(outputStream);
-      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
-    } catch (IOException e) {
-      logger.error("Unexpected error occurs when serializing this 
InsertNode.", e);
-      throw new SerializationRunTimeException(e);
-    }
-  }
-
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     throw new NotImplementedException("serializeAttributes of InsertNode is 
not implemented");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 55b3e939d1..c30d8f61ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -39,8 +38,11 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import com.google.common.util.concurrent.SettableFuture;
@@ -50,7 +52,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -153,17 +154,28 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       throws FragmentInstanceDispatchException {
     try (SyncDataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
-      ByteBuffer buffer = instance.serializeToByteBuffer();
-      TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
-      TSendFragmentInstanceReq req =
-          new TSendFragmentInstanceReq(
-              new TFragmentInstance(buffer), groupId, 
instance.getType().toString());
-      TSendFragmentInstanceResp resp = client.sendFragmentInstance(req);
-      return resp.accepted;
+      switch (instance.getType()) {
+        case READ:
+          TSendFragmentInstanceReq sendFragmentInstanceReq =
+              new TSendFragmentInstanceReq(
+                  new TFragmentInstance(instance.serializeToByteBuffer()),
+                  instance.getRegionReplicaSet().getRegionId());
+          TSendFragmentInstanceResp sendFragmentInstanceResp =
+              client.sendFragmentInstance(sendFragmentInstanceReq);
+          return sendFragmentInstanceResp.accepted;
+        case WRITE:
+          TSendPlanNodeReq sendPlanNodeReq =
+              new TSendPlanNodeReq(
+                  new 
TPlanNode(instance.getFragment().getRoot().serializeToByteBuffer()),
+                  instance.getRegionReplicaSet().getRegionId());
+          TSendPlanNodeResp sendPlanNodeResp = 
client.sendPlanNode(sendPlanNodeReq);
+          return sendPlanNodeResp.accepted;
+      }
     } catch (IOException | TException e) {
       logger.error("can't connect to node {}", endPoint, e);
       throw new FragmentInstanceDispatchException(e);
     }
+    return false;
   }
 
   private boolean dispatchLocally(FragmentInstance instance)
@@ -198,9 +210,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         }
         ConsensusWriteResponse writeResponse;
         if (groupId instanceof DataRegionId) {
-          writeResponse = DataRegionConsensusImpl.getInstance().write(groupId, 
instance);
+          writeResponse = DataRegionConsensusImpl.getInstance().write(groupId, 
planNode);
         } else {
-          writeResponse = 
SchemaRegionConsensusImpl.getInstance().write(groupId, instance);
+          writeResponse = 
SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
         }
         return TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index ab75ed7da9..5dc64a7f4a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -48,19 +47,17 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
@@ -87,6 +84,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -115,58 +114,56 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   @Override
   public TSendFragmentInstanceResp 
sendFragmentInstance(TSendFragmentInstanceReq req) {
     LOGGER.info("receive FragmentInstance to group[{}]", 
req.getConsensusGroupId());
-    QueryType type = QueryType.valueOf(req.queryType);
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    switch (type) {
-      case READ:
-        ConsensusReadResponse readResponse;
-        if (groupId instanceof DataRegionId) {
-          readResponse =
-              DataRegionConsensusImpl.getInstance()
-                  .read(groupId, new 
ByteBufferConsensusRequest(req.fragmentInstance.body));
-        } else {
-          readResponse =
-              SchemaRegionConsensusImpl.getInstance()
-                  .read(groupId, new 
ByteBufferConsensusRequest(req.fragmentInstance.body));
-        }
-        if (!readResponse.isSuccess()) {
-          LOGGER.error(
-              "execute FragmentInstance in ConsensusGroup {} failed because 
{}",
-              req.getConsensusGroupId(),
-              readResponse.getException());
-          return new TSendFragmentInstanceResp(false);
-        }
-        FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
-        return new TSendFragmentInstanceResp(!info.getState().isFailed());
-      case WRITE:
-        TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
-        ConsensusWriteResponse writeResponse;
+    ConsensusReadResponse readResponse;
+    // We deserialize here instead of the underlying state machine because 
parallelism is possible
+    // here but not at the underlying state machine
+    FragmentInstance fragmentInstance = 
FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+    if (groupId instanceof DataRegionId) {
+      readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
+    } else {
+      readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
+    }
+    if (!readResponse.isSuccess()) {
+      LOGGER.error(
+          "execute FragmentInstance in ConsensusGroup {} failed because {}",
+          req.getConsensusGroupId(),
+          readResponse.getException());
+      return new TSendFragmentInstanceResp(false);
+    }
+    FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
+    return new TSendFragmentInstanceResp(!info.getState().isFailed());
+  }
 
-        FragmentInstance fragmentInstance =
-            FragmentInstance.deserializeFrom(req.fragmentInstance.body);
-        PlanNode planNode = fragmentInstance.getFragment().getRoot();
-        if (planNode instanceof InsertNode) {
-          try {
-            SchemaValidator.validate((InsertNode) planNode);
-          } catch (SemanticException e) {
-            response.setAccepted(false);
-            response.setMessage(e.getMessage());
-            return response;
-          }
-        }
-        if (groupId instanceof DataRegionId) {
-          writeResponse = DataRegionConsensusImpl.getInstance().write(groupId, 
fragmentInstance);
-        } else {
-          writeResponse = 
SchemaRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
-        }
-        // TODO need consider more status
-        response.setAccepted(
-            TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
-        response.setMessage(writeResponse.getStatus().message);
+  @Override
+  public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
+    LOGGER.info("receive PlanNode to group[{}]", req.getConsensusGroupId());
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    TSendPlanNodeResp response = new TSendPlanNodeResp();
+    ConsensusWriteResponse writeResponse;
+
+    PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
+    if (planNode instanceof InsertNode) {
+      try {
+        SchemaValidator.validate((InsertNode) planNode);
+      } catch (SemanticException e) {
+        response.setAccepted(false);
+        response.setMessage(e.getMessage());
         return response;
+      }
     }
-    return null;
+    if (groupId instanceof DataRegionId) {
+      writeResponse = DataRegionConsensusImpl.getInstance().write(groupId, 
planNode);
+    } else {
+      writeResponse = SchemaRegionConsensusImpl.getInstance().write(groupId, 
planNode);
+    }
+    // TODO need consider more status
+    response.setAccepted(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
+    response.setMessage(writeResponse.getStatus().message);
+    return response;
   }
 
   @Override
@@ -366,18 +363,13 @@ public class InternalServiceImpl implements 
InternalService.Iface {
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
     deleteRegionNode.setConsensusGroupId(consensusGroupId);
     deleteRegionNode.setPlanNodeId(planNodeId);
-    PlanFragmentId planFragmentId = queryId.genPlanFragmentId();
-    FragmentInstanceId fragmentInstanceId = 
planFragmentId.genFragmentInstanceId();
-    PlanFragment planFragment = new PlanFragment(planFragmentId, 
deleteRegionNode);
-    FragmentInstance fragmentInstance =
-        new FragmentInstance(planFragment, fragmentInstanceId, null, 
QueryType.WRITE);
     if (consensusGroupId instanceof DataRegionId) {
       return DataRegionConsensusImpl.getInstance()
-          .write(consensusGroupId, fragmentInstance)
+          .write(consensusGroupId, deleteRegionNode)
           .getStatus();
     } else {
       return SchemaRegionConsensusImpl.getInstance()
-          .write(consensusGroupId, fragmentInstance)
+          .write(consensusGroupId, deleteRegionNode)
           .getStatus();
     }
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java 
b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 96b55a90fc..811d0f0382 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -35,23 +35,18 @@ import 
org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -139,28 +134,19 @@ public class InternalServiceImplTest {
             "meter1");
 
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
-    PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), 
createTimeSeriesNode);
-    FragmentInstance fragmentInstance =
-        new FragmentInstance(
-            planFragment,
-            planFragment.getId().genFragmentInstanceId(),
-            new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
-    // serialize fragmentInstance
-    ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
+    // serialize planNode
+    ByteBuffer byteBuffer = createTimeSeriesNode.serializeToByteBuffer();
 
-    // put serialized fragmentInstance to TSendFragmentInstanceReq
-    TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
-    TFragmentInstance tFragmentInstance = new TFragmentInstance();
-    tFragmentInstance.setBody(byteBuffer);
-    request.setFragmentInstance(tFragmentInstance);
+    // put serialized planNode to TSendPlanNodeReq
+    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TPlanNode tPlanNode = new TPlanNode();
+    tPlanNode.setBody(byteBuffer);
+    request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
-    request.setQueryType(QueryType.WRITE.toString());
 
     // Use consensus layer to execute request
-    TSendFragmentInstanceResp response = 
internalServiceImpl.sendFragmentInstance(request);
+    TSendPlanNodeResp response = internalServiceImpl.sendPlanNode(request);
 
     Assert.assertTrue(response.accepted);
   }
@@ -225,29 +211,18 @@ public class InternalServiceImplTest {
             });
 
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
-    PlanFragment planFragment =
-        new PlanFragment(new PlanFragmentId("2", 3), 
createAlignedTimeSeriesNode);
-    FragmentInstance fragmentInstance =
-        new FragmentInstance(
-            planFragment,
-            planFragment.getId().genFragmentInstanceId(),
-            new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    // serialize planNode
+    ByteBuffer byteBuffer = 
createAlignedTimeSeriesNode.serializeToByteBuffer();
 
-    // serialize fragmentInstance
-    ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
-
-    // put serialized fragmentInstance to TSendFragmentInstanceReq
-    TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
-    TFragmentInstance tFragmentInstance = new TFragmentInstance();
-    tFragmentInstance.setBody(byteBuffer);
-    request.setFragmentInstance(tFragmentInstance);
+    // put serialized planNode to TSendPlanNodeReq
+    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TPlanNode tPlanNode = new TPlanNode();
+    tPlanNode.setBody(byteBuffer);
+    request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
-    request.setQueryType(QueryType.WRITE.toString());
 
     // Use consensus layer to execute request
-    TSendFragmentInstanceResp response = 
internalServiceImpl.sendFragmentInstance(request);
+    TSendPlanNodeResp response = internalServiceImpl.sendPlanNode(request);
 
     Assert.assertTrue(response.accepted);
   }
@@ -322,29 +297,19 @@ public class InternalServiceImplTest {
             });
 
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
-    PlanFragment planFragment =
-        new PlanFragment(new PlanFragmentId("2", 3), 
createMultiTimeSeriesNode);
-    FragmentInstance fragmentInstance =
-        new FragmentInstance(
-            planFragment,
-            planFragment.getId().genFragmentInstanceId(),
-            new GroupByFilter(1, 2, 3, 4),
-            QueryType.WRITE);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
-    // serialize fragmentInstance
-    ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
+    // serialize planNode
+    ByteBuffer byteBuffer = createMultiTimeSeriesNode.serializeToByteBuffer();
 
-    // put serialized fragmentInstance to TSendFragmentInstanceReq
-    TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
-    TFragmentInstance tFragmentInstance = new TFragmentInstance();
-    tFragmentInstance.setBody(byteBuffer);
-    request.setFragmentInstance(tFragmentInstance);
+    // put serialized planNode to TSendPlanNodeReq
+    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TPlanNode tPlanNode = new TPlanNode();
+    tPlanNode.setBody(byteBuffer);
+    request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
-    request.setQueryType(QueryType.WRITE.toString());
 
     // Use consensus layer to execute request
-    TSendFragmentInstanceResp response = 
internalServiceImpl.sendFragmentInstance(request);
+    TSendPlanNodeResp response = internalServiceImpl.sendPlanNode(request);
 
     Assert.assertTrue(response.accepted);
   }
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift 
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 2fa382843a..5e2836c5a6 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -20,14 +20,8 @@
 include "common.thrift"
 namespace java org.apache.iotdb.consensus.multileader.thrift
 
-enum TLogType {
-   FragmentInstance,
-   InsertNode
-}
-
 struct TLogBatch {
-  1: required TLogType type
-  2: required binary data
+  1: required binary data
 }
 
 struct TSyncLogReq {
diff --git a/thrift/src/main/thrift/mpp.thrift 
b/thrift/src/main/thrift/mpp.thrift
index 702050e540..c82ace2458 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -88,10 +88,13 @@ struct TFragmentInstance {
   1: required binary body
 }
 
+struct TPlanNode {
+  1: required binary body
+}
+
 struct TSendFragmentInstanceReq {
   1: required TFragmentInstance fragmentInstance
   2: required common.TConsensusGroupId consensusGroupId
-  3: required string queryType
 }
 
 struct TSendFragmentInstanceResp {
@@ -99,6 +102,16 @@ struct TSendFragmentInstanceResp {
   2: optional string message
 }
 
+struct TSendPlanNodeReq {
+  1: required TPlanNode planNode
+  2: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TSendPlanNodeResp {
+  1: required bool accepted
+  2: optional string message
+}
+
 struct TFetchFragmentInstanceStateReq {
   1: required TFragmentInstanceId fragmentInstanceId
 }
@@ -154,8 +167,16 @@ service InternalService {
 
   // -----------------------------------For Data 
Node-----------------------------------------------
 
+  /**
+  * disptcher FragmentInstance to remote node for query request
+  */
   TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req);
 
+  /**
+  * disptcher PlanNode to remote node for write request in order to save 
resource
+  */
+  TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req);
+
   TFragmentInstanceStateResp 
fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req);
 
   TCancelResp cancelQuery(TCancelQueryReq req);

Reply via email to