This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e04af828e7 [IOTDB-3385] Reduce the serialization size for the Datanode
consensus layer (#6209)
e04af828e7 is described below
commit e04af828e768ae3c7fb216366cca5a9e1642e0ff
Author: Potato <[email protected]>
AuthorDate: Thu Jun 9 20:42:45 2022 +0800
[IOTDB-3385] Reduce the serialization size for the Datanode consensus layer
(#6209)
---
.../common/request/IndexedConsensusRequest.java | 15 +--
.../apache/iotdb/consensus/config/RatisConfig.java | 4 +-
.../multileader/MultiLeaderServerImpl.java | 13 +--
.../multileader/logdispatcher/LogDispatcher.java | 5 +-
.../service/MultiLeaderRPCServiceProcessor.java | 2 +-
.../multileader/MultiLeaderConsensusTest.java | 2 -
.../consensus/statemachine/BaseStateMachine.java | 15 +++
.../statemachine/DataRegionStateMachine.java | 13 +--
.../statemachine/SchemaRegionStateMachine.java | 4 +-
.../plan/planner/plan/node/DeleteRegionNode.java | 19 ----
.../db/mpp/plan/planner/plan/node/PlanNode.java | 26 ++++-
.../plan/planner/plan/node/write/InsertNode.java | 22 +----
.../scheduler/FragmentInstanceDispatcherImpl.java | 34 ++++---
.../service/thrift/impl/InternalServiceImpl.java | 108 ++++++++++-----------
.../iotdb/db/service/InternalServiceImplTest.java | 89 ++++++-----------
.../src/main/thrift/mutlileader.thrift | 8 +-
thrift/src/main/thrift/mpp.thrift | 23 ++++-
17 files changed, 177 insertions(+), 225 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 073c0f4b64..3578fd5656 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.consensus.common.request;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
-
import java.nio.ByteBuffer;
import java.util.Objects;
@@ -32,16 +30,12 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
private final long safelyDeletedSearchIndex;
- /** we do not need to serialize this field as it will be serialized by
TLogBatch. */
- private final TLogType type;
-
private final IConsensusRequest request;
public IndexedConsensusRequest(
- long searchIndex, long safelyDeletedSearchIndex, TLogType type,
IConsensusRequest request) {
+ long searchIndex, long safelyDeletedSearchIndex, IConsensusRequest
request) {
this.searchIndex = searchIndex;
this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
- this.type = type;
this.request = request;
}
@@ -62,10 +56,6 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
return safelyDeletedSearchIndex;
}
- public TLogType getType() {
- return type;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -77,12 +67,11 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
IndexedConsensusRequest that = (IndexedConsensusRequest) o;
return searchIndex == that.searchIndex
&& safelyDeletedSearchIndex == that.safelyDeletedSearchIndex
- && type == that.type
&& Objects.equals(request, that.request);
}
@Override
public int hashCode() {
- return Objects.hash(searchIndex, safelyDeletedSearchIndex, type, request);
+ return Objects.hash(searchIndex, safelyDeletedSearchIndex, request);
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index 048fd21d76..c62e789f2b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -173,8 +173,8 @@ public class RatisConfig {
}
public static class Builder {
- private TimeDuration timeoutMin = TimeDuration.valueOf(2,
TimeUnit.SECONDS);
- private TimeDuration timeoutMax = TimeDuration.valueOf(8,
TimeUnit.SECONDS);
+ private TimeDuration timeoutMin = TimeDuration.valueOf(500,
TimeUnit.MILLISECONDS);
+ private TimeDuration timeoutMax = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
private TimeDuration requestTimeout = TimeDuration.valueOf(20,
TimeUnit.SECONDS);
private TimeDuration sleepTime = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
private TimeDuration slownessTimeout = TimeDuration.valueOf(10,
TimeUnit.MINUTES);
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index a999de531f..596e72bd53 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.ratis.Utils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -153,19 +152,13 @@ public class MultiLeaderServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
return new IndexedConsensusRequest(
- controller.incrementAndGet(),
- getCurrentSafelyDeletedSearchIndex(),
- TLogType.FragmentInstance,
- request);
+ controller.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(),
request);
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
- ByteBufferConsensusRequest request, TLogType type) {
+ ByteBufferConsensusRequest request) {
return new IndexedConsensusRequest(
- ConsensusReqReader.DEFAULT_SEARCH_INDEX,
- getCurrentSafelyDeletedSearchIndex(),
- type,
- request);
+ ConsensusReqReader.DEFAULT_SEARCH_INDEX,
getCurrentSafelyDeletedSearchIndex(), request);
}
/**
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 88e42fbb7f..8a6a94969e 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -31,7 +31,6 @@ import
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClie
import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
import
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
@@ -281,7 +280,7 @@ public class LogDispatcher {
if (data != null) {
// since WAL can no longer recover FragmentInstance, but only
PlanNode, we need to give
// special flags to use different deserialization methods in the
dataRegion stateMachine
- logBatches.add(new TLogBatch(TLogType.InsertNode,
data.serializeToByteBuffer()));
+ logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
}
}
return currentIndex - 1;
@@ -289,7 +288,7 @@ public class LogDispatcher {
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, List<TLogBatch> logBatches) {
- logBatches.add(new TLogBatch(TLogType.FragmentInstance,
request.serializeToByteBuffer()));
+ logBatches.add(new TLogBatch(request.serializeToByteBuffer()));
}
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 05a2c4cb03..d829f403d0 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -71,7 +71,7 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
impl.getStateMachine()
.write(
impl.buildIndexedConsensusRequestForRemoteRequest(
- new ByteBufferConsensusRequest(batch.data),
batch.type)));
+ new ByteBufferConsensusRequest(batch.data))));
}
}
logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId, statuses);
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index b03ffa8df0..fa86215466 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -33,7 +33,6 @@ import
org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -374,7 +373,6 @@ public class MultiLeaderConsensusTest {
new IndexedConsensusRequest(
((IndexedConsensusRequest) request).getSearchIndex(),
-1,
- TLogType.FragmentInstance,
new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
} else {
requestSet.add(((IndexedConsensusRequest) request));
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index e42e393788..6d8dc5f62b 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,4 +45,17 @@ public abstract class BaseStateMachine implements
IStateMachine, IStateMachine.E
}
return instance;
}
+
+ protected PlanNode getPlanNode(IConsensusRequest request) {
+ PlanNode node;
+ if (request instanceof ByteBufferConsensusRequest) {
+ node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+ } else if (request instanceof PlanNode) {
+ node = (PlanNode) request;
+ } else {
+ logger.error("Unexpected IConsensusRequest : {}", request);
+ throw new IllegalArgumentException("Unexpected IConsensusRequest!");
+ }
+ return node;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 1cf07f56d4..ad33a65ae2 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.thrift.TLogType;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -34,7 +33,6 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -104,14 +102,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
try {
if (request instanceof IndexedConsensusRequest) {
IndexedConsensusRequest indexedConsensusRequest =
(IndexedConsensusRequest) request;
- if (indexedConsensusRequest.getType() == TLogType.InsertNode) {
- planNode =
- PlanNodeType.deserialize(
-
indexedConsensusRequest.getRequest().serializeToByteBuffer());
- } else {
- planNode =
-
getFragmentInstance(indexedConsensusRequest.getRequest()).getFragment().getRoot();
- }
+ planNode = getPlanNode(indexedConsensusRequest.getRequest());
if (planNode instanceof InsertNode) {
((InsertNode) planNode)
.setSearchIndex(((IndexedConsensusRequest)
request).getSearchIndex());
@@ -120,7 +111,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
((IndexedConsensusRequest)
request).getSafelyDeletedSearchIndex());
}
} else {
- planNode = getFragmentInstance(request).getFragment().getRoot();
+ planNode = getPlanNode(request);
}
return write(planNode);
} catch (IllegalArgumentException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 930093f7a6..93b2a1d06a 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -66,8 +65,7 @@ public class SchemaRegionStateMachine extends
BaseStateMachine {
public TSStatus write(IConsensusRequest request) {
logger.info("Execute write plan in SchemaRegionStateMachine");
try {
- PlanNode planNode = getFragmentInstance(request).getFragment().getRoot();
- return planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
+ return getPlanNode(request).accept(new SchemaExecutionVisitor(),
schemaRegion);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
index 17f020c64d..ff1bb66f7e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/DeleteRegionNode.java
@@ -22,14 +22,9 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,8 +32,6 @@ import java.util.List;
public class DeleteRegionNode extends WritePlanNode implements
IConsensusRequest {
- private final Logger logger =
LoggerFactory.getLogger(DeleteRegionNode.class);
-
protected ConsensusGroupId consensusGroupId;
public DeleteRegionNode(PlanNodeId id) {
@@ -117,16 +110,4 @@ public class DeleteRegionNode extends WritePlanNode
implements IConsensusRequest
ReadWriteIOUtils.write(consensusGroupId.getType().getValue(), stream);
ReadWriteIOUtils.write(consensusGroupId.getId(), stream);
}
-
- @Override
- public ByteBuffer serializeToByteBuffer() {
- try (PublicBAOS publicBAOS = new PublicBAOS();
- DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
- super.serialize(outputStream);
- return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
- } catch (IOException e) {
- logger.error("Unexpected error occurs when serializing this
DeleteRegionNode.", e);
- throw new SerializationRunTimeException(e);
- }
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index 07f8746ee9..9b0d600cdb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -18,9 +18,14 @@
*/
package org.apache.iotdb.db.mpp.plan.planner.plan.node;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,7 +36,10 @@ import java.util.Objects;
import static java.util.Objects.requireNonNull;
/** The base class of query logical plan nodes, which is used to compose
logical query plan. */
-public abstract class PlanNode {
+public abstract class PlanNode implements IConsensusRequest {
+
+ private final Logger logger = LoggerFactory.getLogger(PlanNode.class);
+
protected static final int NO_CHILD_ALLOWED = 0;
protected static final int ONE_CHILD = 1;
protected static final int CHILD_COUNT_NO_LIMIT = -1;
@@ -113,6 +121,22 @@ public abstract class PlanNode {
}
}
+ /**
+ * Deserialize via {@link
+ *
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)}
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer() {
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ serialize(outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ } catch (IOException e) {
+ logger.error("Unexpected error occurs when serializing writePlanNode.",
e);
+ throw new SerializationRunTimeException(e);
+ }
+ }
+
protected abstract void serializeAttributes(ByteBuffer byteBuffer);
protected abstract void serializeAttributes(DataOutputStream stream) throws
IOException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index d880cf23ad..92dc6e6e63 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,10 +20,8 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -32,7 +30,6 @@ import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -49,7 +46,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
-public abstract class InsertNode extends WritePlanNode implements
IConsensusRequest {
+public abstract class InsertNode extends WritePlanNode {
private final Logger logger = LoggerFactory.getLogger(InsertNode.class);
/** this insert node doesn't need to participate in multi-leader consensus */
@@ -179,23 +176,6 @@ public abstract class InsertNode extends WritePlanNode
implements IConsensusRequ
this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
}
- /**
- * Deserialize via {@link
- *
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)}
- */
- @Override
- public ByteBuffer serializeToByteBuffer() {
- try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
- DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- serializeAttributes(outputStream);
- getPlanNodeId().serialize(outputStream);
- return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
- } catch (IOException e) {
- logger.error("Unexpected error occurs when serializing this
InsertNode.", e);
- throw new SerializationRunTimeException(e);
- }
- }
-
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
throw new NotImplementedException("serializeAttributes of InsertNode is
not implemented");
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 55b3e939d1..c30d8f61ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.plan.scheduler;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -39,8 +38,11 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.rpc.TSStatusCode;
import com.google.common.util.concurrent.SettableFuture;
@@ -50,7 +52,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -153,17 +154,28 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
throws FragmentInstanceDispatchException {
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- ByteBuffer buffer = instance.serializeToByteBuffer();
- TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
- TSendFragmentInstanceReq req =
- new TSendFragmentInstanceReq(
- new TFragmentInstance(buffer), groupId,
instance.getType().toString());
- TSendFragmentInstanceResp resp = client.sendFragmentInstance(req);
- return resp.accepted;
+ switch (instance.getType()) {
+ case READ:
+ TSendFragmentInstanceReq sendFragmentInstanceReq =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(instance.serializeToByteBuffer()),
+ instance.getRegionReplicaSet().getRegionId());
+ TSendFragmentInstanceResp sendFragmentInstanceResp =
+ client.sendFragmentInstance(sendFragmentInstanceReq);
+ return sendFragmentInstanceResp.accepted;
+ case WRITE:
+ TSendPlanNodeReq sendPlanNodeReq =
+ new TSendPlanNodeReq(
+ new
TPlanNode(instance.getFragment().getRoot().serializeToByteBuffer()),
+ instance.getRegionReplicaSet().getRegionId());
+ TSendPlanNodeResp sendPlanNodeResp =
client.sendPlanNode(sendPlanNodeReq);
+ return sendPlanNodeResp.accepted;
+ }
} catch (IOException | TException e) {
logger.error("can't connect to node {}", endPoint, e);
throw new FragmentInstanceDispatchException(e);
}
+ return false;
}
private boolean dispatchLocally(FragmentInstance instance)
@@ -198,9 +210,9 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
ConsensusWriteResponse writeResponse;
if (groupId instanceof DataRegionId) {
- writeResponse = DataRegionConsensusImpl.getInstance().write(groupId,
instance);
+ writeResponse = DataRegionConsensusImpl.getInstance().write(groupId,
planNode);
} else {
- writeResponse =
SchemaRegionConsensusImpl.getInstance().write(groupId, instance);
+ writeResponse =
SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
}
return TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index ab75ed7da9..5dc64a7f4a 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -48,19 +47,17 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
@@ -87,6 +84,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -115,58 +114,56 @@ public class InternalServiceImpl implements
InternalService.Iface {
@Override
public TSendFragmentInstanceResp
sendFragmentInstance(TSendFragmentInstanceReq req) {
LOGGER.info("receive FragmentInstance to group[{}]",
req.getConsensusGroupId());
- QueryType type = QueryType.valueOf(req.queryType);
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- switch (type) {
- case READ:
- ConsensusReadResponse readResponse;
- if (groupId instanceof DataRegionId) {
- readResponse =
- DataRegionConsensusImpl.getInstance()
- .read(groupId, new
ByteBufferConsensusRequest(req.fragmentInstance.body));
- } else {
- readResponse =
- SchemaRegionConsensusImpl.getInstance()
- .read(groupId, new
ByteBufferConsensusRequest(req.fragmentInstance.body));
- }
- if (!readResponse.isSuccess()) {
- LOGGER.error(
- "execute FragmentInstance in ConsensusGroup {} failed because
{}",
- req.getConsensusGroupId(),
- readResponse.getException());
- return new TSendFragmentInstanceResp(false);
- }
- FragmentInstanceInfo info = (FragmentInstanceInfo)
readResponse.getDataset();
- return new TSendFragmentInstanceResp(!info.getState().isFailed());
- case WRITE:
- TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
- ConsensusWriteResponse writeResponse;
+ ConsensusReadResponse readResponse;
+ // We deserialize here instead of the underlying state machine because
parallelism is possible
+ // here but not at the underlying state machine
+ FragmentInstance fragmentInstance =
FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+ if (groupId instanceof DataRegionId) {
+ readResponse = DataRegionConsensusImpl.getInstance().read(groupId,
fragmentInstance);
+ } else {
+ readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId,
fragmentInstance);
+ }
+ if (!readResponse.isSuccess()) {
+ LOGGER.error(
+ "execute FragmentInstance in ConsensusGroup {} failed because {}",
+ req.getConsensusGroupId(),
+ readResponse.getException());
+ return new TSendFragmentInstanceResp(false);
+ }
+ FragmentInstanceInfo info = (FragmentInstanceInfo)
readResponse.getDataset();
+ return new TSendFragmentInstanceResp(!info.getState().isFailed());
+ }
- FragmentInstance fragmentInstance =
- FragmentInstance.deserializeFrom(req.fragmentInstance.body);
- PlanNode planNode = fragmentInstance.getFragment().getRoot();
- if (planNode instanceof InsertNode) {
- try {
- SchemaValidator.validate((InsertNode) planNode);
- } catch (SemanticException e) {
- response.setAccepted(false);
- response.setMessage(e.getMessage());
- return response;
- }
- }
- if (groupId instanceof DataRegionId) {
- writeResponse = DataRegionConsensusImpl.getInstance().write(groupId,
fragmentInstance);
- } else {
- writeResponse =
SchemaRegionConsensusImpl.getInstance().write(groupId, fragmentInstance);
- }
- // TODO need consider more status
- response.setAccepted(
- TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode());
- response.setMessage(writeResponse.getStatus().message);
+ @Override
+ public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
+ LOGGER.info("receive PlanNode to group[{}]", req.getConsensusGroupId());
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ TSendPlanNodeResp response = new TSendPlanNodeResp();
+ ConsensusWriteResponse writeResponse;
+
+ PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
+ if (planNode instanceof InsertNode) {
+ try {
+ SchemaValidator.validate((InsertNode) planNode);
+ } catch (SemanticException e) {
+ response.setAccepted(false);
+ response.setMessage(e.getMessage());
return response;
+ }
}
- return null;
+ if (groupId instanceof DataRegionId) {
+ writeResponse = DataRegionConsensusImpl.getInstance().write(groupId,
planNode);
+ } else {
+ writeResponse = SchemaRegionConsensusImpl.getInstance().write(groupId,
planNode);
+ }
+ // TODO need consider more status
+ response.setAccepted(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode());
+ response.setMessage(writeResponse.getStatus().message);
+ return response;
}
@Override
@@ -366,18 +363,13 @@ public class InternalServiceImpl implements
InternalService.Iface {
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
deleteRegionNode.setConsensusGroupId(consensusGroupId);
deleteRegionNode.setPlanNodeId(planNodeId);
- PlanFragmentId planFragmentId = queryId.genPlanFragmentId();
- FragmentInstanceId fragmentInstanceId =
planFragmentId.genFragmentInstanceId();
- PlanFragment planFragment = new PlanFragment(planFragmentId,
deleteRegionNode);
- FragmentInstance fragmentInstance =
- new FragmentInstance(planFragment, fragmentInstanceId, null,
QueryType.WRITE);
if (consensusGroupId instanceof DataRegionId) {
return DataRegionConsensusImpl.getInstance()
- .write(consensusGroupId, fragmentInstance)
+ .write(consensusGroupId, deleteRegionNode)
.getStatus();
} else {
return SchemaRegionConsensusImpl.getInstance()
- .write(consensusGroupId, fragmentInstance)
+ .write(consensusGroupId, deleteRegionNode)
.getStatus();
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 96b55a90fc..811d0f0382 100644
---
a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -35,23 +35,18 @@ import
org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -139,28 +134,19 @@ public class InternalServiceImplTest {
"meter1");
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
- PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3),
createTimeSeriesNode);
- FragmentInstance fragmentInstance =
- new FragmentInstance(
- planFragment,
- planFragment.getId().genFragmentInstanceId(),
- new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
- fragmentInstance.setDataRegionAndHost(regionReplicaSet);
- // serialize fragmentInstance
- ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
+ // serialize planNode
+ ByteBuffer byteBuffer = createTimeSeriesNode.serializeToByteBuffer();
- // put serialized fragmentInstance to TSendFragmentInstanceReq
- TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
- TFragmentInstance tFragmentInstance = new TFragmentInstance();
- tFragmentInstance.setBody(byteBuffer);
- request.setFragmentInstance(tFragmentInstance);
+ // put serialized planNode to TSendPlanNodeReq
+ TSendPlanNodeReq request = new TSendPlanNodeReq();
+ TPlanNode tPlanNode = new TPlanNode();
+ tPlanNode.setBody(byteBuffer);
+ request.setPlanNode(tPlanNode);
request.setConsensusGroupId(regionReplicaSet.getRegionId());
- request.setQueryType(QueryType.WRITE.toString());
// Use consensus layer to execute request
- TSendFragmentInstanceResp response =
internalServiceImpl.sendFragmentInstance(request);
+ TSendPlanNodeResp response = internalServiceImpl.sendPlanNode(request);
Assert.assertTrue(response.accepted);
}
@@ -225,29 +211,18 @@ public class InternalServiceImplTest {
});
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
- PlanFragment planFragment =
- new PlanFragment(new PlanFragmentId("2", 3),
createAlignedTimeSeriesNode);
- FragmentInstance fragmentInstance =
- new FragmentInstance(
- planFragment,
- planFragment.getId().genFragmentInstanceId(),
- new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
- fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+ // serialize planNode
+ ByteBuffer byteBuffer =
createAlignedTimeSeriesNode.serializeToByteBuffer();
- // serialize fragmentInstance
- ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
-
- // put serialized fragmentInstance to TSendFragmentInstanceReq
- TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
- TFragmentInstance tFragmentInstance = new TFragmentInstance();
- tFragmentInstance.setBody(byteBuffer);
- request.setFragmentInstance(tFragmentInstance);
+ // put serialized planNode to TSendPlanNodeReq
+ TSendPlanNodeReq request = new TSendPlanNodeReq();
+ TPlanNode tPlanNode = new TPlanNode();
+ tPlanNode.setBody(byteBuffer);
+ request.setPlanNode(tPlanNode);
request.setConsensusGroupId(regionReplicaSet.getRegionId());
- request.setQueryType(QueryType.WRITE.toString());
// Use consensus layer to execute request
- TSendFragmentInstanceResp response =
internalServiceImpl.sendFragmentInstance(request);
+ TSendPlanNodeResp response = internalServiceImpl.sendPlanNode(request);
Assert.assertTrue(response.accepted);
}
@@ -322,29 +297,19 @@ public class InternalServiceImplTest {
});
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
- PlanFragment planFragment =
- new PlanFragment(new PlanFragmentId("2", 3),
createMultiTimeSeriesNode);
- FragmentInstance fragmentInstance =
- new FragmentInstance(
- planFragment,
- planFragment.getId().genFragmentInstanceId(),
- new GroupByFilter(1, 2, 3, 4),
- QueryType.WRITE);
- fragmentInstance.setDataRegionAndHost(regionReplicaSet);
- // serialize fragmentInstance
- ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
+ // serialize planNode
+ ByteBuffer byteBuffer = createMultiTimeSeriesNode.serializeToByteBuffer();
- // put serialized fragmentInstance to TSendFragmentInstanceReq
- TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
- TFragmentInstance tFragmentInstance = new TFragmentInstance();
- tFragmentInstance.setBody(byteBuffer);
- request.setFragmentInstance(tFragmentInstance);
+ // put serialized planNode to TSendPlanNodeReq
+ TSendPlanNodeReq request = new TSendPlanNodeReq();
+ TPlanNode tPlanNode = new TPlanNode();
+ tPlanNode.setBody(byteBuffer);
+ request.setPlanNode(tPlanNode);
request.setConsensusGroupId(regionReplicaSet.getRegionId());
- request.setQueryType(QueryType.WRITE.toString());
// Use consensus layer to execute request
- TSendFragmentInstanceResp response =
internalServiceImpl.sendFragmentInstance(request);
+ TSendPlanNodeResp response = internalServiceImpl.sendPlanNode(request);
Assert.assertTrue(response.accepted);
}
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 2fa382843a..5e2836c5a6 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -20,14 +20,8 @@
include "common.thrift"
namespace java org.apache.iotdb.consensus.multileader.thrift
-enum TLogType {
- FragmentInstance,
- InsertNode
-}
-
struct TLogBatch {
- 1: required TLogType type
- 2: required binary data
+ 1: required binary data
}
struct TSyncLogReq {
diff --git a/thrift/src/main/thrift/mpp.thrift
b/thrift/src/main/thrift/mpp.thrift
index 702050e540..c82ace2458 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -88,10 +88,13 @@ struct TFragmentInstance {
1: required binary body
}
+struct TPlanNode {
+ 1: required binary body
+}
+
struct TSendFragmentInstanceReq {
1: required TFragmentInstance fragmentInstance
2: required common.TConsensusGroupId consensusGroupId
- 3: required string queryType
}
struct TSendFragmentInstanceResp {
@@ -99,6 +102,16 @@ struct TSendFragmentInstanceResp {
2: optional string message
}
+struct TSendPlanNodeReq {
+ 1: required TPlanNode planNode
+ 2: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TSendPlanNodeResp {
+ 1: required bool accepted
+ 2: optional string message
+}
+
struct TFetchFragmentInstanceStateReq {
1: required TFragmentInstanceId fragmentInstanceId
}
@@ -154,8 +167,16 @@ service InternalService {
// -----------------------------------For Data
Node-----------------------------------------------
+ /**
+ * disptcher FragmentInstance to remote node for query request
+ */
TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req);
+ /**
+ * disptcher PlanNode to remote node for write request in order to save
resource
+ */
+ TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req);
+
TFragmentInstanceStateResp
fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req);
TCancelResp cancelQuery(TCancelQueryReq req);