This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1d63c3e688feafd2000d52c102f97e44a3a6bc63 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Aug 8 16:08:45 2022 +0800 change to parallel write in follower side --- .../org/apache/iotdb/consensus/IStateMachine.java | 7 -- .../iotdb/consensus/config/MultiLeaderConfig.java | 2 +- .../service/MultiLeaderRPCServiceProcessor.java | 24 ++--- .../consensus/standalone/StandAloneServerImpl.java | 11 -- .../apache/iotdb/consensus/EmptyStateMachine.java | 9 -- .../multileader/util/TestStateMachine.java | 7 -- .../apache/iotdb/consensus/ratis/TestUtils.java | 7 -- .../standalone/StandAloneConsensusTest.java | 8 -- .../consensus/statemachine/BaseStateMachine.java | 12 --- .../statemachine/DataRegionStateMachine.java | 119 +++++++++------------ .../plan/planner/plan/node/write/InsertNode.java | 10 ++ 11 files changed, 74 insertions(+), 142 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java index 3b5e6b1796..203b7d2a6b 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java @@ -25,10 +25,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; - -import org.apache.thrift.async.AsyncMethodCallback; import javax.annotation.concurrent.ThreadSafe; @@ -45,9 +41,6 @@ public interface IStateMachine { void stop(); - void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler); - /** * apply a write-request from user * diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java index 4a95a85661..1a9b4dac9f 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java @@ -249,7 +249,7 @@ public class MultiLeaderConfig { public static class Builder { private int maxPendingRequestNumPerNode = 200; private int maxRequestPerBatch = 40; - private int maxPendingBatch = 1; + private int maxPendingBatch = 5; private int maxWaitingTimeForAccumulatingBatchInMs = 500; private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100); private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java index ad1304a608..2e00442e37 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.StepTracker; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest; import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus; import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl; @@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface { @@ -71,7 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status))); return; } - List<IndexedConsensusRequest> indexedConsensusRequests = new LinkedList<>(); + List<TSStatus> statuses = new ArrayList<>(); // We use synchronized to ensure atomicity of executing multiple logs if (!req.getBatches().isEmpty()) { List<IConsensusRequest> consensusRequests = new ArrayList<>(); @@ -83,9 +81,11 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ : new ByteBufferConsensusRequest(batch.data); // merge TLogBatch with same search index into one request if (batch.getSearchIndex() != currentSearchIndex) { - indexedConsensusRequests.add( - impl.buildIndexedConsensusRequestForRemoteRequest( - currentSearchIndex, consensusRequests)); + statuses.add( + impl.getStateMachine() + .write( + impl.buildIndexedConsensusRequestForRemoteRequest( + currentSearchIndex, consensusRequests))); consensusRequests = new ArrayList<>(); currentSearchIndex = batch.getSearchIndex(); } @@ -93,14 +93,12 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ } // write last request if (!consensusRequests.isEmpty()) { - indexedConsensusRequests.add( - impl.buildIndexedConsensusRequestForRemoteRequest( - currentSearchIndex, consensusRequests)); + statuses.add( + impl.getStateMachine() + .write( + impl.buildIndexedConsensusRequestForRemoteRequest( + currentSearchIndex, consensusRequests))); } - long followerWriteRequestStartTime = System.nanoTime(); - impl.getStateMachine().multiLeaderWriteAsync(indexedConsensusRequests, resultHandler); - StepTracker.trace( - "followerWriteRequest", 25, followerWriteRequestStartTime, System.nanoTime()); } } catch (Exception e) { resultHandler.onError(e); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java index cd57b252ec..8438a5fbe4 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java @@ -24,13 +24,8 @@ import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; - -import org.apache.thrift.async.AsyncMethodCallback; import java.io.File; -import java.util.List; public class StandAloneServerImpl implements IStateMachine { @@ -62,12 +57,6 @@ public class StandAloneServerImpl implements IStateMachine { stateMachine.stop(); } - @Override - public void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) { - throw new UnsupportedOperationException(); - } - @Override public TSStatus write(IConsensusRequest request) { return stateMachine.write(request); diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java index 86aeddd19c..01992f472f 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java @@ -22,13 +22,8 @@ package org.apache.iotdb.consensus; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; - -import org.apache.thrift.async.AsyncMethodCallback; import java.io.File; -import java.util.List; public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi { @@ -38,10 +33,6 @@ public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi @Override public void stop() {} - @Override - public void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {} - @Override public TSStatus write(IConsensusRequest IConsensusRequest) { return new TSStatus(0); diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java index dc7b31f831..eab940cf87 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java @@ -25,14 +25,11 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader; import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.async.AsyncMethodCallback; - import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -68,10 +65,6 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi { @Override public void stop() {} - @Override - public void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {} - @Override public TSStatus write(IConsensusRequest request) { synchronized (requestSets) { diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index 72e741e6c9..d383fe902b 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -27,10 +27,7 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; -import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,10 +86,6 @@ public class TestUtils { @Override public void stop() {} - @Override - public void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {} - @Override public TSStatus write(IConsensusRequest request) { TestRequest testRequest; diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java index b9438177f5..9e0e13be6d 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java @@ -33,7 +33,6 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; @@ -41,10 +40,8 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; import org.apache.iotdb.consensus.exception.IllegalPeerNumException; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; import org.apache.ratis.util.FileUtils; -import org.apache.thrift.async.AsyncMethodCallback; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,7 +50,6 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; -import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -101,10 +97,6 @@ public class StandAloneConsensusTest { @Override public void stop() {} - @Override - public void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {} - @Override public TSStatus write(IConsensusRequest request) { if (request instanceof ByteBufferConsensusRequest) { diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java index 27d696129f..3bd74bd91a 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java @@ -23,31 +23,19 @@ import org.apache.iotdb.commons.StepTracker; import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.wal.buffer.WALEntry; -import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.NotSupportedException; - -import java.util.List; - public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi { private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class); - public void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) { - throw new NotSupportedException(); - } - protected FragmentInstance getFragmentInstance(IConsensusRequest request) { FragmentInstance instance; if (request instanceof ByteBufferConsensusRequest) { diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index 4744163a83..4f53926670 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; -import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; +import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader; import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan; import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor; import org.apache.iotdb.db.engine.StorageEngineV2; @@ -44,14 +44,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.async.AsyncMethodCallback; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; @@ -64,13 +63,15 @@ public class DataRegionStateMachine extends BaseStateMachine { private DataRegion region; - private static final int MAX_REQUEST_CACHE_SIZE = 1; + private static final int MAX_REQUEST_CACHE_SIZE = 5; private static final long CACHE_WINDOW_TIME_IN_MS = 10_000; - private final PriorityQueue<InsertNodeWrapper> requestCache; + + private final PriorityQueue<InsertNode> requestCache; + private long nextSyncIndex = -1; public DataRegionStateMachine(DataRegion region) { this.region = region; - this.requestCache = new PriorityQueue<>(); + this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex)); } @Override @@ -115,31 +116,49 @@ public class DataRegionStateMachine extends BaseStateMachine { } } - private InsertNodeWrapper cacheAndGetLatestInsertNode( - long syncIndex, - List<InsertNode> insertNodes, - AsyncMethodCallback<TSyncLogRes> resultHandler) { + private TSStatus cacheAndInsertLatestNode(long syncIndex, InsertNode insertNode) { + long cacheRequestStartTime = System.nanoTime(); + insertNode.setSyncIndex(syncIndex); synchronized (requestCache) { - requestCache.add(new InsertNodeWrapper(syncIndex, insertNodes, resultHandler)); + requestCache.add(insertNode); + // If the peek is not hold by current thread, it should notify the corresponding thread to + // process the peek when the queue is full if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) { - return requestCache.poll(); + requestCache.notifyAll(); } - return null; + while (true) { + if (insertNode.getSyncIndex() == nextSyncIndex) { + requestCache.remove(insertNode); + nextSyncIndex++; + break; + } + if (requestCache.size() == MAX_REQUEST_CACHE_SIZE + && requestCache.peek().getSyncIndex() == insertNode.getSyncIndex()) { + requestCache.remove(); + nextSyncIndex = insertNode.getSyncIndex() + 1; + break; + } + try { + requestCache.wait(CACHE_WINDOW_TIME_IN_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime()); + logger.info("queue size {}, syncIndex = {}", requestCache.size(), insertNode.getSyncIndex()); + TSStatus tsStatus = write(insertNode); + requestCache.notifyAll(); + return tsStatus; } } private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> { private final long syncIndex; - private final List<InsertNode> insertNodes; - private final AsyncMethodCallback<TSyncLogRes> resultHandler; + private final InsertNode insertNode; - public InsertNodeWrapper( - long syncIndex, - List<InsertNode> insertNode, - AsyncMethodCallback<TSyncLogRes> resultHandler) { + public InsertNodeWrapper(long syncIndex, InsertNode insertNode) { this.syncIndex = syncIndex; - this.insertNodes = insertNode; - this.resultHandler = resultHandler; + this.insertNode = insertNode; } @Override @@ -151,51 +170,8 @@ public class DataRegionStateMachine extends BaseStateMachine { return syncIndex; } - public List<InsertNode> getInsertNodes() { - return insertNodes; - } - - public AsyncMethodCallback<TSyncLogRes> getResultHandler() { - return resultHandler; - } - } - - public void multiLeaderWriteAsync( - List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) { - long prepareStartTime = System.nanoTime(); - List<TSStatus> statuses = new LinkedList<>(); - try { - List<InsertNode> insertNodesInAllRequests = new LinkedList<>(); - for (IndexedConsensusRequest indexedRequest : requests) { - List<InsertNode> insertNodesInOneRequest = - new ArrayList<>(indexedRequest.getRequests().size()); - for (IConsensusRequest req : indexedRequest.getRequests()) { - // PlanNode in IndexedConsensusRequest should always be InsertNode - InsertNode innerNode = (InsertNode) getPlanNode(req); - innerNode.setSearchIndex(indexedRequest.getSearchIndex()); - insertNodesInOneRequest.add(innerNode); - } - insertNodesInAllRequests.add(mergeInsertNodes(insertNodesInOneRequest)); - } - long startTime = System.nanoTime(); - InsertNodeWrapper insertNodeWrapper = - cacheAndGetLatestInsertNode( - requests.get(0).getSyncIndex(), insertNodesInAllRequests, resultHandler); - StepTracker.trace("cacheAndGet", 25, startTime, System.nanoTime()); - StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime()); - long writeStartTime = System.nanoTime(); - if (insertNodeWrapper != null) { - for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) { - statuses.add(write(insertNode)); - } - insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses)); - } else { - logger.error("insertNodeWrapper is null"); - } - StepTracker.trace("followerWriteInsert", 25, writeStartTime, System.nanoTime()); - } catch (IllegalArgumentException e) { - logger.error(e.getMessage(), e); - resultHandler.onError(e); + public InsertNode getInsertNode() { + return insertNode; } } @@ -212,7 +188,16 @@ public class DataRegionStateMachine extends BaseStateMachine { innerNode.setSearchIndex(indexedRequest.getSearchIndex()); insertNodes.add(innerNode); } - planNode = mergeInsertNodes(insertNodes); + if (indexedRequest.getSearchIndex() == ConsensusReqReader.DEFAULT_SEARCH_INDEX) { + + TSStatus status = + cacheAndInsertLatestNode( + indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes)); + + return status; + } else { + planNode = mergeInsertNodes(insertNodes); + } } else { planNode = getPlanNode(request); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java index f5b5bddb52..787dfbf9ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java @@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode { */ protected long searchIndex = NO_CONSENSUS_INDEX; + protected long syncIndex = NO_CONSENSUS_INDEX; + /** Physical address of data region after splitting */ protected TRegionReplicaSet dataRegionReplicaSet; @@ -153,6 +155,14 @@ public abstract class InsertNode extends WritePlanNode { return searchIndex; } + public void setSyncIndex(long syncIndex) { + this.syncIndex = syncIndex; + } + + public long getSyncIndex() { + return syncIndex; + } + /** Search index should start from 1 */ public void setSearchIndex(long searchIndex) { this.searchIndex = searchIndex;
