This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 065840b769d7424215ee7e3bfaf708c4568a818b Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Nov 9 21:19:35 2022 +0800 Control the size of each pending batch to be no larger than maxSizePerBatch Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../common/request/IndexedConsensusRequest.java | 26 +++---- .../iotdb/consensus/config/MultiLeaderConfig.java | 38 +++++++--- .../multileader/MultiLeaderServerImpl.java | 2 +- .../multileader/logdispatcher/LogDispatcher.java | 82 +++++++++++----------- .../multileader/logdispatcher/PendingBatch.java | 40 ++++++++--- .../multileader/logdispatcher/SyncStatus.java | 4 +- .../multileader/logdispatcher/SyncStatusTest.java | 21 ++++-- .../plan/node/write/InsertMultiTabletsNode.java | 2 +- 8 files changed, 131 insertions(+), 84 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index 6abca549b6..ca236da1c3 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -20,7 +20,7 @@ package org.apache.iotdb.consensus.common.request; import java.nio.ByteBuffer; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -31,8 +31,8 @@ public class IndexedConsensusRequest implements IConsensusRequest { private final long searchIndex; private final long syncIndex; - private List<IConsensusRequest> requests; - private List<ByteBuffer> serializedRequests; + private final List<IConsensusRequest> requests; + private final List<ByteBuffer> serializedRequests = new ArrayList<>(); private long serializedSize = 0; public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) { @@ -41,15 +41,6 @@ public class IndexedConsensusRequest implements IConsensusRequest { this.syncIndex = -1L; } - public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) { - this.searchIndex = searchIndex; - this.serializedRequests = serializedRequests; - for (ByteBuffer byteBuffer : serializedRequests) { - serializedSize += byteBuffer.capacity(); - } - this.syncIndex = -1L; - } - public IndexedConsensusRequest( long searchIndex, long syncIndex, List<IConsensusRequest> requests) { this.searchIndex = searchIndex; @@ -70,10 +61,13 @@ public class IndexedConsensusRequest implements IConsensusRequest { return serializedRequests; } - public List<ByteBuffer> buildSerializedRequests() { - List<ByteBuffer> result = new LinkedList<>(); - this.requests.forEach(r -> result.add(r.serializeToByteBuffer())); - return result; + public void buildSerializedRequests() { + this.requests.forEach( + r -> { + ByteBuffer buffer = r.serializeToByteBuffer(); + this.serializedRequests.add(buffer); + this.serializedSize += buffer.capacity(); + }); } public long getSerializedSize() { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java index cd0d58c4ae..8c7abec0ce 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java @@ -196,7 +196,9 @@ public class MultiLeaderConfig { } public static class Replication { - private final int maxRequestPerBatch; + private final int maxRequestNumPerBatch; + + private final int maxSizePerBatch; private final int maxPendingBatch; private final int maxWaitingTimeForAccumulatingBatchInMs; private final long basicRetryWaitTimeMs; @@ -207,7 +209,8 @@ public class MultiLeaderConfig { private final Long allocateMemoryForConsensus; private Replication( - int maxRequestPerBatch, + int maxRequestNumPerBatch, + int maxSizePerBatch, int maxPendingBatch, int maxWaitingTimeForAccumulatingBatchInMs, long basicRetryWaitTimeMs, @@ -216,7 +219,8 @@ public class MultiLeaderConfig { long throttleTimeOutMs, long checkpointGap, long allocateMemoryForConsensus) { - this.maxRequestPerBatch = maxRequestPerBatch; + this.maxRequestNumPerBatch = maxRequestNumPerBatch; + this.maxSizePerBatch = maxSizePerBatch; this.maxPendingBatch = maxPendingBatch; this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs; this.basicRetryWaitTimeMs = basicRetryWaitTimeMs; @@ -227,8 +231,12 @@ public class MultiLeaderConfig { this.allocateMemoryForConsensus = allocateMemoryForConsensus; } - public int getMaxRequestPerBatch() { - return maxRequestPerBatch; + public int getMaxRequestNumPerBatch() { + return maxRequestNumPerBatch; + } + + public int getMaxSizePerBatch() { + return maxSizePerBatch; } public int getMaxPendingBatch() { @@ -268,7 +276,8 @@ public class MultiLeaderConfig { } public static class Builder { - private int maxRequestPerBatch = 30; + private int maxRequestNumPerBatch = 30; + private int maxSizePerBatch = 4 * 1024 * 1024; // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE // in DataRegionStateMachine private int maxPendingBatch = 5; @@ -280,8 +289,13 @@ public class MultiLeaderConfig { private long checkpointGap = 500; private long allocateMemoryForConsensus; - public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) { - this.maxRequestPerBatch = maxRequestPerBatch; + public Replication.Builder setMaxRequestNumPerBatch(int maxRequestNumPerBatch) { + this.maxRequestNumPerBatch = maxRequestNumPerBatch; + return this; + } + + public Builder setMaxSizePerBatch(int maxSizePerBatch) { + this.maxSizePerBatch = maxSizePerBatch; return this; } @@ -316,6 +330,11 @@ public class MultiLeaderConfig { return this; } + public Builder setCheckpointGap(long checkpointGap) { + this.checkpointGap = checkpointGap; + return this; + } + public Replication.Builder setAllocateMemoryForConsensus(long allocateMemoryForConsensus) { this.allocateMemoryForConsensus = allocateMemoryForConsensus; return this; @@ -323,7 +342,8 @@ public class MultiLeaderConfig { public Replication build() { return new Replication( - maxRequestPerBatch, + maxRequestNumPerBatch, + maxSizePerBatch, maxPendingBatch, maxWaitingTimeForAccumulatingBatchInMs, basicRetryWaitTimeMs, diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index e965973459..46f22472aa 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -99,7 +99,7 @@ public class MultiLeaderServerImpl { private final LogDispatcher logDispatcher; private final MultiLeaderConfig config; private final ConsensusReqReader reader; - private boolean active; + private volatile boolean active; private String latestSnapshotId; private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager; private final MultiLeaderServerMetrics metrics; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 606731e6a7..4880e2aae4 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -59,6 +58,7 @@ import java.util.stream.Collectors; /** Manage all asynchronous replication threads and corresponding async clients */ public class LogDispatcher { + private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class); private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L; private final MultiLeaderServerImpl impl; @@ -156,7 +156,7 @@ public class LogDispatcher { } public void offer(IndexedConsensusRequest request) { - List<ByteBuffer> serializedRequests = request.buildSerializedRequests(); + request.buildSerializedRequests(); // we put the serialization step outside the synchronized block because it is stateless and // time-consuming synchronized (this) { @@ -167,8 +167,7 @@ public class LogDispatcher { impl.getThisNode().getGroupId(), thread.getPeer().getEndpoint().getIp(), thread.getPendingRequestSize()); - if (!thread.offer( - new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) { + if (!thread.offer(request)) { logger.debug( "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}", impl.getThisNode().getGroupId(), @@ -180,6 +179,7 @@ public class LogDispatcher { } public class LogDispatcherThread implements Runnable { + private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10; private static final long START_INDEX = 1; private final MultiLeaderConfig config; @@ -198,7 +198,7 @@ public class LogDispatcher { MultiLeaderMemoryManager.getInstance(); private volatile boolean stopped = false; - private ConsensusReqReader.ReqIterator walEntryIterator; + private final ConsensusReqReader.ReqIterator walEntryIterator; private final LogDispatcherThreadMetrics metrics; @@ -300,7 +300,7 @@ public class LogDispatcher { if (request != null) { bufferedRequest.add(request); // If write pressure is low, we simply sleep a little to reduce the number of RPC - if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) { + if (pendingRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) { Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs()); } } @@ -342,12 +342,10 @@ public class LogDispatcher { } public PendingBatch getBatch() { - PendingBatch batch; - List<TLogBatch> logBatches = new ArrayList<>(); long startIndex = syncStatus.getNextSendingIndex(); long maxIndexWhenBufferedRequestEmpty = startIndex; logger.debug("[GetBatch] startIndex: {}", startIndex); - if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) { + if (bufferedRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) { // Use drainTo instead of poll to reduce lock overhead logger.debug( "{} : pendingRequest Size: {}, bufferedRequest size: {}", @@ -357,7 +355,7 @@ public class LogDispatcher { synchronized (impl.getIndexObject()) { pendingRequest.drainTo( bufferedRequest, - config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size()); + config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size()); maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1; } // remove all request that searchIndex < startIndex @@ -372,48 +370,57 @@ public class LogDispatcher { } } } + PendingBatch batches = new PendingBatch(config); // This condition will be executed in several scenarios: // 1. restart // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed // up. To prevent inconsistency here, we use the synchronized logic when calculate value of // `maxIndexWhenBufferedRequestEmpty` if (bufferedRequest.isEmpty()) { - constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches); - batch = new PendingBatch(logBatches); + constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, batches); + batches.buildIndex(); logger.debug( - "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch); + "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batches); } else { // Notice that prev searchIndex >= startIndex Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator(); IndexedConsensusRequest prev = iterator.next(); + // Prevents gap between logs. For example, some requests are not written into the queue when // the queue is full. In this case, requests need to be loaded from the WAL - constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches); - if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { - batch = new PendingBatch(logBatches); - logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch); - return batch; + constructBatchFromWAL(startIndex, prev.getSearchIndex(), batches); + if (!batches.canAccumulate()) { + batches.buildIndex(); + logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batches); + return batches; } - constructBatchIndexedFromConsensusRequest(prev, logBatches); + + constructBatchIndexedFromConsensusRequest(prev, batches); iterator.remove(); releaseReservedMemory(prev); - while (iterator.hasNext() - && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) { + if (!batches.canAccumulate()) { + batches.buildIndex(); + logger.debug( + "{} : accumulated a {} from queue", impl.getThisNode().getGroupId(), batches); + return batches; + } + + while (iterator.hasNext() && batches.canAccumulate()) { IndexedConsensusRequest current = iterator.next(); // Prevents gap between logs. For example, some logs are not written into the queue when // the queue is full. In this case, requests need to be loaded from the WAL if (current.getSearchIndex() != prev.getSearchIndex() + 1) { - constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches); - if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { - batch = new PendingBatch(logBatches); + constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), batches); + if (!batches.canAccumulate()) { + batches.buildIndex(); logger.debug( "gap {} : accumulated a {} from queue and wal when gap", impl.getThisNode().getGroupId(), - batch); - return batch; + batches); + return batches; } } - constructBatchIndexedFromConsensusRequest(current, logBatches); + constructBatchIndexedFromConsensusRequest(current, batches); prev = current; // We might not be able to remove all the elements in the bufferedRequest in the // current function, but that's fine, we'll continue processing these elements in the @@ -421,11 +428,11 @@ public class LogDispatcher { iterator.remove(); releaseReservedMemory(current); } - batch = new PendingBatch(logBatches); + batches.buildIndex(); logger.debug( - "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch); + "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batches); } - return batch; + return batches; } public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) { @@ -450,8 +457,7 @@ public class LogDispatcher { return syncStatus; } - private long constructBatchFromWAL( - long currentIndex, long maxIndex, List<TLogBatch> logBatches) { + private void constructBatchFromWAL(long currentIndex, long maxIndex, PendingBatch logBatches) { logger.debug( String.format( "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d", @@ -460,8 +466,7 @@ public class LogDispatcher { long targetIndex = currentIndex; // Even if there is no WAL files, these code won't produce error. walEntryIterator.skipTo(targetIndex); - while (targetIndex < maxIndex - && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) { + while (targetIndex < maxIndex && logBatches.canAccumulate()) { logger.debug("construct from WAL for one Entry, index : {}", targetIndex); try { walEntryIterator.waitForNextReady(); @@ -490,19 +495,16 @@ public class LogDispatcher { targetIndex = data.getSearchIndex() + 1; // construct request from wal for (IConsensusRequest innerRequest : data.getRequests()) { - logBatches.add( + logBatches.addTLogBatch( new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true)); } } - return logBatches.size() > 0 - ? logBatches.get(logBatches.size() - 1).searchIndex - : currentIndex; } private void constructBatchIndexedFromConsensusRequest( - IndexedConsensusRequest request, List<TLogBatch> logBatches) { + IndexedConsensusRequest request, PendingBatch logBatches) { for (ByteBuffer innerRequest : request.getSerializedRequests()) { - logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false)); + logBatches.addTLogBatch(new TLogBatch(innerRequest, request.getSearchIndex(), false)); } } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java index a6ee43392f..920e781b09 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java @@ -19,28 +19,48 @@ package org.apache.iotdb.consensus.multileader.logdispatcher; +import org.apache.iotdb.consensus.config.MultiLeaderConfig; import org.apache.iotdb.consensus.multileader.thrift.TLogBatch; +import java.util.ArrayList; import java.util.List; public class PendingBatch { - private final long startIndex; - private final long endIndex; - private final List<TLogBatch> batches; + private final MultiLeaderConfig config; + + private long startIndex; + private long endIndex; + + private final List<TLogBatch> batches = new ArrayList<>(); + + private long serializedSize; // indicates whether this batch has been successfully synchronized to another node private boolean synced; - public PendingBatch(List<TLogBatch> batches) { + public PendingBatch(MultiLeaderConfig config) { + this.config = config; + } + + /* + Note: this method must be called once after all the `addTLogBatch` functions have been called + */ + public void buildIndex() { if (!batches.isEmpty()) { this.startIndex = batches.get(0).getSearchIndex(); this.endIndex = batches.get(batches.size() - 1).getSearchIndex(); - } else { - this.startIndex = 0; - this.endIndex = 0; } - this.batches = batches; - this.synced = false; + } + + public void addTLogBatch(TLogBatch batch) { + batches.add(batch); + // TODO Maybe we need to add in additional fields for more accurate calculations + serializedSize += batch.getData() == null ? 0 : batch.getData().length; + } + + public boolean canAccumulate() { + return batches.size() < config.getReplication().getMaxRequestNumPerBatch() + && serializedSize < config.getReplication().getMaxSizePerBatch(); } public long getStartIndex() { @@ -76,6 +96,8 @@ public class PendingBatch { + endIndex + ", size=" + batches.size() + + ", serializedSize=" + + serializedSize + '}'; } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java index e9901d931a..3549c1158b 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java @@ -30,7 +30,7 @@ public class SyncStatus { private final MultiLeaderConfig config; private final IndexController controller; - private final List<PendingBatch> pendingBatches = new LinkedList<>(); + private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>(); public SyncStatus(IndexController controller, MultiLeaderConfig config) { this.controller = controller; @@ -80,7 +80,7 @@ public class SyncStatus { return 1 + (pendingBatches.isEmpty() ? controller.getCurrentIndex() - : pendingBatches.get(pendingBatches.size() - 1).getEndIndex()); + : pendingBatches.getLast().getEndIndex()); } } diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java index c4363428fb..bec9c6c955 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java @@ -31,7 +31,6 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -66,7 +65,9 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build()); + batch.addTLogBatch(logBatch); + batch.buildIndex(); batchList.add(batch); status.addNextBatch(batch); } @@ -95,7 +96,9 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build()); + batch.addTLogBatch(logBatch); + batch.buildIndex(); batchList.add(batch); status.addNextBatch(batch); } @@ -130,7 +133,9 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build()); + batch.addTLogBatch(logBatch); + batch.buildIndex(); batchList.add(batch); status.addNextBatch(batch); } @@ -176,7 +181,9 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build()); + batch.addTLogBatch(logBatch); + batch.buildIndex(); batchList.add(batch); status.addNextBatch(batch); } @@ -195,7 +202,9 @@ public class SyncStatusTest { () -> { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(config.getReplication().getMaxPendingBatch()); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build()); + batch.addTLogBatch(logBatch); + batch.buildIndex(); batchList.add(batch); try { status.addNextBatch(batch); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java index be8e90b914..6ce2eeff80 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java @@ -92,7 +92,7 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod List<InsertTabletNode> insertTabletNodeList; /** record the result of insert tablets */ - private Map<Integer, TSStatus> results = new HashMap<>(); + private final Map<Integer, TSStatus> results = new HashMap<>(); public InsertMultiTabletsNode(PlanNodeId id) { super(id);
