This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a87fc6dd51c IoTV2: Refine receiver and sender transfer logic to
prevent stuck (#15569)
a87fc6dd51c is described below
commit a87fc6dd51c94ceae1ac4fda53e4821c35a85712
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed May 28 11:40:23 2025 +0800
IoTV2: Refine receiver and sender transfer logic to prevent stuck (#15569)
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../apache/iotdb/consensus/ConsensusFactory.java | 4 +-
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 7 +-
.../pipeconsensus/PipeConsensusAsyncConnector.java | 350 ++++++++++++++-------
.../pipeconsensus/PipeConsensusSyncConnector.java | 4 +-
...r.java => PipeConsensusDeleteEventHandler.java} | 64 ++--
.../PipeConsensusTabletBatchEventHandler.java | 2 +-
.../PipeConsensusTabletInsertionEventHandler.java | 21 +-
.../PipeConsensusTsFileInsertionEventHandler.java | 10 +
.../PipeConsensusTransferBatchReqBuilder.java | 6 +-
.../pipeconsensus/PipeConsensusReceiver.java | 138 +++++---
...ner.java => IoTV2GlobalComponentContainer.java} | 45 ++-
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../iotdb/commons/pipe/event/EnrichedEvent.java | 10 +
.../pipe/receiver/PipeReceiverStatusHandler.java | 5 +
.../org/apache/iotdb/commons/utils/RetryUtils.java | 12 +
16 files changed, 457 insertions(+), 223 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d49fbd04bb1..d054a31e74d 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -319,6 +319,7 @@ public enum TSStatusCode {
CONSENSUS_GROUP_NOT_EXIST(2206),
RATIS_READ_UNAVAILABLE(2207),
PIPE_CONSENSUS_CLOSE_ERROR(2208),
+ PIPE_CONSENSUS_WAIT_ORDER_TIMEOUT(2209),
;
private final int statusCode;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 6196b37f123..a9dd2c1e5ea 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.consensus;
-import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;
@@ -55,7 +55,7 @@ public class ConsensusFactory {
if (className.equals(IOT_CONSENSUS_V2)) {
className = REAL_PIPE_CONSENSUS;
// initialize iotConsensusV2's thrift component
- PipeConsensusClientMgrContainer.build();
+ IoTV2GlobalComponentContainer.build();
// initialize iotConsensusV2's metric component
PipeConsensusSyncLagManager.build();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 045b6db26ec..e7a10d3b9d3 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
@@ -121,9 +121,9 @@ public class PipeConsensus implements IConsensus {
this.consensusPipeGuardian =
config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
this.asyncClientManager =
-
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
+
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
this.syncClientManager =
-
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
+
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
}
@Override
@@ -238,6 +238,7 @@ public class PipeConsensus implements IConsensus {
registerManager.deregisterAll();
consensusPipeGuardian.stop();
stateMachineMap.values().parallelStream().forEach(PipeConsensusServerImpl::stop);
+ IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
}
private void checkAllConsensusPipe() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 49a9241c1e3..0d3fc73bfc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
@@ -40,17 +40,19 @@ import
org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusDeleteEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
+import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
+import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -67,11 +69,14 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -95,10 +100,14 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
- private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
+ private final Queue<EnrichedEvent> retryEventQueue =
+ new PriorityQueue<>(
+ IOTDB_CONFIG.getIotConsensusV2PipelineSize(),
+ Comparator.comparingLong(EnrichedEvent::getReplicateIndexForIoTV2));
// We use enrichedEvent here to make use of
EnrichedEvent.equalsInPipeConsensus
private final BlockingQueue<EnrichedEvent> transferBuffer =
new LinkedBlockingDeque<>(IOTDB_CONFIG.getIotConsensusV2PipelineSize());
+ private ScheduledExecutorService backgroundTaskService;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final int thisDataNodeId =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
@@ -147,7 +156,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
nodeUrls, consensusGroupId, thisDataNodeId,
pipeConsensusConnectorMetrics);
retryConnector.customize(parameters, configuration);
asyncTransferClientManager =
-
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
+
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
if (isTabletBatchModeEnabled) {
tabletBatchBuilder =
@@ -159,6 +168,8 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
// currently, tablet batch is false by default in PipeConsensus;
isTabletBatchModeEnabled = false;
+ this.backgroundTaskService =
+ IoTV2GlobalComponentContainer.getInstance().getBackgroundTaskService();
}
/**
@@ -248,7 +259,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
- syncTransferQueuedEventsIfNecessary();
+ asyncTransferQueuedEventsIfNecessary();
boolean enqueueResult = addEvent2Buffer((EnrichedEvent)
tabletInsertionEvent);
if (!enqueueResult) {
@@ -267,45 +278,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
tabletBatchBuilder.onSuccess();
}
} else {
- TCommitId tCommitId;
- TConsensusGroupId tConsensusGroupId =
- new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
- // tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
- final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
- (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
- tCommitId =
- new TCommitId(
- pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
-
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
- pipeInsertNodeTabletInsertionEvent.getRebootTimes());
-
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
- PipeConsensusAsyncConnector.class.getName())) {
- return;
- }
-
- final InsertNode insertNode =
- pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
- final ProgressIndex progressIndex =
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
- final TPipeConsensusTransferReq pipeConsensusTransferReq =
- Objects.isNull(insertNode)
- ? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
- tCommitId,
- tConsensusGroupId,
- progressIndex,
- thisDataNodeId)
- : PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
- insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId);
- final PipeConsensusTabletInsertNodeEventHandler
pipeConsensusInsertNodeReqHandler =
- new PipeConsensusTabletInsertNodeEventHandler(
- pipeInsertNodeTabletInsertionEvent,
- pipeConsensusTransferReq,
- this,
- pipeConsensusConnectorMetrics);
-
- transfer(pipeConsensusInsertNodeReqHandler);
+ transferInEventWithoutCheck((PipeInsertionEvent) tabletInsertionEvent);
}
}
@@ -321,6 +294,50 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
}
}
+ private boolean transferInEventWithoutCheck(PipeInsertionEvent
tabletInsertionEvent)
+ throws Exception {
+ // tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
+ (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName())) {
+ return false;
+ }
+
+ TCommitId tCommitId;
+ TConsensusGroupId tConsensusGroupId =
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
+ tCommitId =
+ new TCommitId(
+ pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
+ pipeInsertNodeTabletInsertionEvent.getRebootTimes());
+
+ final InsertNode insertNode =
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
+ final ProgressIndex progressIndex =
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
+ final TPipeConsensusTransferReq pipeConsensusTransferReq =
+ Objects.isNull(insertNode)
+ ? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
+ tCommitId,
+ tConsensusGroupId,
+ progressIndex,
+ thisDataNodeId)
+ : PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+ insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId);
+ final PipeConsensusTabletInsertNodeEventHandler
pipeConsensusInsertNodeReqHandler =
+ new PipeConsensusTabletInsertNodeEventHandler(
+ pipeInsertNodeTabletInsertionEvent,
+ pipeConsensusTransferReq,
+ this,
+ pipeConsensusConnectorMetrics);
+
+ transfer(pipeConsensusInsertNodeReqHandler);
+ return true;
+ }
+
private void transfer(
final PipeConsensusTabletInsertNodeEventHandler
pipeConsensusInsertNodeReqHandler) {
AsyncPipeConsensusServiceClient client = null;
@@ -335,7 +352,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
- syncTransferQueuedEventsIfNecessary();
+ asyncTransferQueuedEventsIfNecessary();
transferBatchedEventsIfNecessary();
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
@@ -350,8 +367,19 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
}
+
+ transferWithoutCheck(tsFileInsertionEvent);
+ }
+
+ private boolean transferWithoutCheck(TsFileInsertionEvent
tsFileInsertionEvent) throws Exception {
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName())) {
+ return false;
+ }
+
TCommitId tCommitId =
new TCommitId(
pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(),
@@ -359,11 +387,6 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
pipeTsFileInsertionEvent.getRebootTimes());
TConsensusGroupId tConsensusGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeTsFileInsertionEvent.increaseReferenceCount(
- PipeConsensusAsyncConnector.class.getName())) {
- return;
- }
try {
// Just in case. To avoid the case that exception occurred when
constructing the handler.
@@ -382,6 +405,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
pipeConsensusConnectorMetrics);
transfer(pipeConsensusTsFileInsertionEventHandler);
+ return true;
} catch (Exception e) {
// Just in case. To avoid the case that exception occurred when
constructing the handler.
pipeTsFileInsertionEvent.decreaseReferenceCount(
@@ -408,7 +432,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
*/
@Override
public void transfer(Event event) throws Exception {
- syncTransferQueuedEventsIfNecessary();
+ asyncTransferQueuedEventsIfNecessary();
transferBatchedEventsIfNecessary();
// Transfer deletion
@@ -419,10 +443,8 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
}
- retryConnector.transfer(event);
- // Since transfer method will throw an exception if transfer failed,
removeEventFromBuffer
- // will not be executed when transfer failed.
- this.removeEventFromBuffer(deleteDataNodeEvent);
+
+ transferDeletion(deleteDataNodeEvent);
return;
}
@@ -432,6 +454,48 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
}
}
+ private boolean transferDeletion(PipeDeleteDataNodeEvent
pipeDeleteDataNodeEvent) {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeDeleteDataNodeEvent.increaseReferenceCount(
+ PipeConsensusSyncConnector.class.getName())) {
+ return false;
+ }
+
+ final ProgressIndex progressIndex =
pipeDeleteDataNodeEvent.getProgressIndex();
+ final TCommitId tCommitId =
+ new TCommitId(
+ pipeDeleteDataNodeEvent.getReplicateIndexForIoTV2(),
+ pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(),
+ pipeDeleteDataNodeEvent.getRebootTimes());
+ final TConsensusGroupId tConsensusGroupId =
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
+
+ final TPipeConsensusTransferReq pipeConsensusTransferReq =
+ PipeConsensusDeleteNodeReq.toTPipeConsensusTransferReq(
+ pipeDeleteDataNodeEvent.getDeleteDataNode(),
+ tCommitId,
+ tConsensusGroupId,
+ progressIndex,
+ thisDataNodeId);
+ final PipeConsensusDeleteEventHandler pipeConsensusDeleteEventHandler =
+ new PipeConsensusDeleteEventHandler(
+ pipeDeleteDataNodeEvent, pipeConsensusTransferReq, this,
pipeConsensusConnectorMetrics);
+
+ transfer(pipeConsensusDeleteEventHandler);
+ return true;
+ }
+
+ private void transfer(final PipeConsensusDeleteEventHandler
pipeConsensusDeleteEventHandler) {
+ AsyncPipeConsensusServiceClient client = null;
+ try {
+ client = asyncTransferClientManager.borrowClient(getFollowerUrl());
+ pipeConsensusDeleteEventHandler.transfer(client);
+ } catch (final Exception ex) {
+ logOnClientException(client, ex);
+ pipeConsensusDeleteEventHandler.onError(ex);
+ }
+ }
+
/** Try its best to commit data in order. Flush can also be a trigger to
transfer batched data. */
private void transferBatchedEventsIfNecessary() throws IOException {
if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
@@ -444,88 +508,136 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
tabletBatchBuilder.onSuccess();
}
- /**
- * Transfer queued {@link Event}s which are waiting for retry.
- *
- * @throws Exception if an error occurs. The error will be handled by pipe
framework, which will
- * retry the {@link Event} and mark the {@link Event} as failure and
stop the pipe if the
- * retry times exceeds the threshold.
- */
- private void syncTransferQueuedEventsIfNecessary() throws Exception {
+ /** Transfer queued {@link Event}s which are waiting for retry. */
+ private void asyncTransferQueuedEventsIfNecessary() {
+ long retryStartTime = System.currentTimeMillis();
while (!retryEventQueue.isEmpty()) {
synchronized (this) {
if (isClosed.get() || retryEventQueue.isEmpty()) {
return;
}
-
- final Event peekedEvent = retryEventQueue.peek();
- // do transfer
- if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- retryConnector.transfer((PipeInsertNodeTabletInsertionEvent)
peekedEvent);
- } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
- retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
- } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
- retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
- } else {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn(
- "PipeConsensusAsyncConnector does not support transfer generic
event: {}.",
- peekedEvent);
- }
- }
- // release resource
- if (peekedEvent instanceof EnrichedEvent) {
- ((EnrichedEvent) peekedEvent)
-
.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), true);
+ if (System.currentTimeMillis() - retryStartTime >
TimeUnit.SECONDS.toMillis(20)) {
+ // just in case that some events are polled and re-added into queue
again and again,
+ // causing this loop to run forever.
+ LOGGER.warn(
+ "PipeConsensus-ConsensusGroup-{}: retryEventQueue is not empty
after 20 seconds. retryQueue size: {}",
+ consensusGroupId,
+ retryEventQueue.size());
+ return;
}
- final Event polledEvent = retryEventQueue.poll();
- if (polledEvent != peekedEvent) {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.error(
- "The event polled from the queue is not the same as the event
peeked from the queue. "
- + "Peeked event: {}, polled event: {}.",
- peekedEvent,
- polledEvent);
- }
- }
- if (polledEvent != null) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Polled event {} from retry queue.", polledEvent);
- }
- // poll it from transferBuffer
- removeEventFromBuffer((EnrichedEvent) polledEvent);
- }
+ // remove this event from queue. If retry fail as well, event will be
re-added into
+ // retryQueue.
+ final EnrichedEvent peekedEvent = retryEventQueue.poll();
+ // retry with interval when necessarily
+ long retryInterval =
+ peekedEvent.getRetryInterval() >
EnrichedEvent.INITIAL_RETRY_INTERVAL_FOR_IOTV2
+ ? peekedEvent.getRetryInterval()
+ : 0L;
+ LOGGER.info(
+ "PipeConsensus-ConsensusGroup-{}: retry with interval {} for {}",
+ consensusGroupId,
+ retryInterval,
+ peekedEvent);
+ // need to retry in background service, otherwise the retryInterval
will block the sender
+ // procedure.
+ backgroundTaskService.schedule(
+ () -> {
+ // do transfer
+ if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ retryTransfer((PipeInsertNodeTabletInsertionEvent)
peekedEvent);
+ } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
+ retryTransfer((PipeTsFileInsertionEvent) peekedEvent);
+ } else if (peekedEvent instanceof PipeDeleteDataNodeEvent) {
+ retryTransfer((PipeDeleteDataNodeEvent) peekedEvent);
+ } else {
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn(
+ "PipeConsensusAsyncConnector does not support transfer
generic event: {}.",
+ peekedEvent);
+ }
+ }
+ },
+ retryInterval,
+ TimeUnit.MILLISECONDS);
}
}
}
+ private void retryTransfer(final PipeInsertionEvent tabletInsertionEvent) {
+ // TODO: batch transfer
+ try {
+ if (transferInEventWithoutCheck(tabletInsertionEvent)) {
+ tabletInsertionEvent.decreaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName(), false);
+ } else {
+ addFailureEventToRetryQueue(tabletInsertionEvent);
+ }
+ } catch (final Exception e) {
+ tabletInsertionEvent.decreaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName(), false);
+ addFailureEventToRetryQueue(tabletInsertionEvent);
+ }
+ }
+
+ private void retryTransfer(final PipeTsFileInsertionEvent
tsFileInsertionEvent) {
+ try {
+ if (transferWithoutCheck(tsFileInsertionEvent)) {
+ tsFileInsertionEvent.decreaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName(), false);
+ } else {
+ addFailureEventToRetryQueue(tsFileInsertionEvent);
+ }
+ } catch (final Exception e) {
+ tsFileInsertionEvent.decreaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName(), false);
+ addFailureEventToRetryQueue(tsFileInsertionEvent);
+ }
+ }
+
+ private void retryTransfer(final PipeDeleteDataNodeEvent
deleteDataNodeEvent) {
+ try {
+ if (transferDeletion(deleteDataNodeEvent)) {
+ deleteDataNodeEvent.decreaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName(), false);
+ } else {
+ addFailureEventToRetryQueue(deleteDataNodeEvent);
+ }
+ } catch (final Exception e) {
+ deleteDataNodeEvent.decreaseReferenceCount(
+ PipeConsensusAsyncConnector.class.getName(), false);
+ addFailureEventToRetryQueue(deleteDataNodeEvent);
+ }
+ }
+
/**
* Add failure event to retry queue.
*
* @param event event to retry
*/
@SuppressWarnings("java:S899")
- public void addFailureEventToRetryQueue(final Event event) {
+ public void addFailureEventToRetryQueue(final EnrichedEvent event) {
+ if (event.isReleased()) {
+ return;
+ }
+
if (isClosed.get()) {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
- }
+ event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
+ return;
+ }
+ // just in case
+ if (retryEventQueue.contains(event)) {
return;
}
retryEventQueue.offer(event);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be
added to retry queue.",
- consensusGroupId,
- event);
- }
+ LOGGER.info(
+ "PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be
added to retry queue.",
+ consensusGroupId,
+ event);
if (isClosed.get()) {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
- }
+ event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
}
}
@@ -534,18 +646,16 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
*
* @param events events to retry
*/
- public void addFailureEventsToRetryQueue(final Iterable<Event> events) {
- for (final Event event : events) {
+ public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent>
events) {
+ for (final EnrichedEvent event : events) {
addFailureEventToRetryQueue(event);
}
}
public synchronized void clearRetryEventsReferenceCount() {
while (!retryEventQueue.isEmpty()) {
- final Event event = retryEventQueue.poll();
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
- }
+ final EnrichedEvent event = retryEventQueue.poll();
+ event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index d5ba9d04120..02a632ec84d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
-import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
@@ -97,7 +97,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
this.consensusGroupId = consensusGroupId;
this.thisDataNodeId = thisDataNodeId;
this.syncRetryClientManager =
-
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
+
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java
similarity index 63%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java
index 609e7e5e509..aa0734c883e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java
@@ -21,13 +21,12 @@ package
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertionEventHandler;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -36,23 +35,23 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class PipeConsensusTabletInsertionEventHandler<E extends
TPipeConsensusTransferResp>
- implements AsyncMethodCallback<E> {
+public class PipeConsensusDeleteEventHandler
+ implements AsyncMethodCallback<TPipeConsensusTransferResp> {
private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeConsensusTabletInsertionEventHandler.class);
+ LoggerFactory.getLogger(PipeConsensusDeleteEventHandler.class);
- protected final TabletInsertionEvent event;
+ private final PipeDeleteDataNodeEvent event;
- protected final TPipeConsensusTransferReq req;
+ private final TPipeConsensusTransferReq req;
- protected final PipeConsensusAsyncConnector connector;
+ private final PipeConsensusAsyncConnector connector;
- protected final PipeConsensusConnectorMetrics metric;
+ private final PipeConsensusConnectorMetrics metric;
private final long createTime;
- protected PipeConsensusTabletInsertionEventHandler(
- TabletInsertionEvent event,
+ public PipeConsensusDeleteEventHandler(
+ PipeDeleteDataNodeEvent event,
TPipeConsensusTransferReq req,
PipeConsensusAsyncConnector connector,
PipeConsensusConnectorMetrics metric) {
@@ -64,12 +63,9 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
}
public void transfer(AsyncPipeConsensusServiceClient client) throws
TException {
- doTransfer(client, req);
+ client.pipeConsensusTransfer(req, this);
}
- protected abstract void doTransfer(
- AsyncPipeConsensusServiceClient client, TPipeConsensusTransferReq req)
throws TException;
-
@Override
public void onComplete(TPipeConsensusTransferResp response) {
// Just in case
@@ -85,16 +81,13 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
connector.statusHandler().handle(status, status.getMessage(),
event.toString());
}
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event)
-
.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
true);
- }
+
event.decreaseReferenceCount(PipeConsensusDeleteEventHandler.class.getName(),
true);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
- "InsertNodeTransfer: no.{} event successfully processed!",
- ((EnrichedEvent) event).getReplicateIndexForIoTV2());
- connector.removeEventFromBuffer((EnrichedEvent) event);
+ "DeleteNodeTransfer: no.{} event successfully processed!",
+ event.getReplicateIndexForIoTV2());
+ connector.removeEventFromBuffer(event);
}
long duration = System.nanoTime() - createTime;
@@ -105,16 +98,23 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
}
@Override
- public void onError(Exception exception) {
+ public void onError(Exception e) {
LOGGER.warn(
- "Failed to transfer TabletInsertionEvent {} (committer key={},
replicate index={}).",
- event instanceof EnrichedEvent
- ? ((EnrichedEvent) event).coreReportMessage()
- : event.toString(),
- event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
- event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getReplicateIndexForIoTV2() : null,
- exception);
-
+ "Failed to transfer PipeDeleteNodeEvent {} (committer key={},
replicate index={}).",
+ event.coreReportMessage(),
+ event.getCommitterKey(),
+ event.getReplicateIndexForIoTV2(),
+ e);
+
+ if (RetryUtils.needRetryWithIncreasingInterval(e)) {
+ // just in case for overflow
+ if (event.getRetryInterval() << 2 <= 0) {
+ event.setRetryInterval(1000L * 20);
+ } else {
+ event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval()
<< 2));
+ }
+ }
+ // IoTV2 ensures that only use PipeInsertionEvent, which is definitely
EnrichedEvent.
connector.addFailureEventToRetryQueue(event);
metric.recordRetryCounter();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
index a2c6406b4a1..31677862579 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
@@ -46,7 +46,7 @@ public class PipeConsensusTabletBatchEventHandler
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusTabletBatchEventHandler.class);
private final List<Long> requestCommitIds;
- private final List<Event> events;
+ private final List<EnrichedEvent> events;
private final TPipeConsensusBatchTransferReq req;
private final PipeConsensusAsyncConnector connector;
private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
index 609e7e5e509..482fea76140 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
@@ -106,16 +107,24 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
@Override
public void onError(Exception exception) {
+ EnrichedEvent event = (EnrichedEvent) this.event;
LOGGER.warn(
"Failed to transfer TabletInsertionEvent {} (committer key={},
replicate index={}).",
- event instanceof EnrichedEvent
- ? ((EnrichedEvent) event).coreReportMessage()
- : event.toString(),
- event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
- event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getReplicateIndexForIoTV2() : null,
+ event.coreReportMessage(),
+ event.getCommitterKey(),
+ event.getReplicateIndexForIoTV2(),
exception);
- connector.addFailureEventToRetryQueue(event);
+ if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
+ // just in case for overflow
+ if (event.getRetryInterval() << 2 <= 0) {
+ event.setRetryInterval(1000L * 20);
+ } else {
+ event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval()
<< 2));
+ }
+ }
+ // IoTV2 ensures that only use PipeInsertionEvent, which is definitely
EnrichedEvent.
+ connector.addFailureEventToRetryQueue((EnrichedEvent) event);
metric.recordRetryCounter();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index e3e2bf0ba05..c7ee3e2c0bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
@@ -292,6 +293,15 @@ public class PipeConsensusTsFileInsertionEventHandler
event.getReplicateIndexForIoTV2(),
exception);
+ if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
+ // just in case for overflow
+ if (event.getRetryInterval() << 2 <= 0) {
+ event.setRetryInterval(1000L * 20);
+ } else {
+ event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval()
<< 2));
+ }
+ }
+
try {
if (reader != null) {
reader.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index cf004b65f4d..978646fe8fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -60,7 +60,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusTransferBatchReqBuilder.class);
- protected final List<Event> events = new ArrayList<>();
+ protected final List<EnrichedEvent> events = new ArrayList<>();
protected final List<Long> requestCommitIds = new ArrayList<>();
protected final List<TPipeConsensusTransferReq> batchReqs = new
ArrayList<>();
// limit in delayed time
@@ -138,7 +138,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
// The deduplication logic here is to avoid the accumulation of the same
event in a batch when
// retrying.
if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
- events.add(event);
+ events.add((EnrichedEvent) event);
requestCommitIds.add(requestCommitId);
final int bufferSize = buildTabletInsertionBuffer(event);
@@ -179,7 +179,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
return batchReqs.isEmpty();
}
- public List<Event> deepCopyEvents() {
+ public List<EnrichedEvent> deepCopyEvents() {
return new ArrayList<>(events);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index d76b956b4d3..acadcfc2d53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -101,7 +101,7 @@ public class PipeConsensusReceiver {
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final long PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS =
(long) IOTDB_CONFIG.getConnectionTimeoutInMS()
- / 6
+ / 3
* IOTDB_CONFIG.getIotConsensusV2PipelineSize();
private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000;
private static final long RETRY_WAIT_TIME = 500;
@@ -1397,8 +1397,11 @@ public class PipeConsensusReceiver {
* although events can arrive receiver in a random sequence.
*/
private class RequestExecutor {
- private static final String THIS_NODE = "this node";
- private static final String PIPE_TASK = "pipe task";
+ private static final String MSG_NODE_RESTART_INDEX_STALE =
+ "sender dn restarts before this event was sent here";
+ private static final String MSG_PIPE_RESTART_INDEX_STALE =
+ "pipe task restarts before this event was sent here";
+ private static final String MSG_STALE_REPLICATE_INDEX = "replicate index
is out dated";
// An ordered set that buffers transfer requests' TCommitId, whose length
is not larger than
// PIPE_CONSENSUS_PIPELINE_SIZE.
@@ -1419,6 +1422,7 @@ public class PipeConsensusReceiver {
this.reqExecutionOrderBuffer =
new TreeSet<>(
Comparator.comparingInt(RequestMeta::getDataNodeRebootTimes)
+ .thenComparingInt(RequestMeta::getPipeTaskRestartTimes)
.thenComparingLong(RequestMeta::getReplicateIndex));
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
@@ -1426,6 +1430,34 @@ public class PipeConsensusReceiver {
this.tsFileWriterPool = tsFileWriterPool;
}
+ private TPipeConsensusTransferResp preCheck(TCommitId tCommitId) {
+ // if a req is deprecated, we will discard it
+ // This case may happen in this scenario: leader has transferred {1,2}
and is intending to
+ // transfer {3, 4, 5, 6}. And in one moment, follower has received {4,
5, 6}, {3} is still
+ // transferring due to some network latency.
+ // At this time, leader restarts, and it will resend {3, 4, 5, 6} with
incremental
+ // rebootTimes. If the {3} sent before the leader restart arrives after
the follower
+ // receives
+ // the request with incremental rebootTimes, the {3} sent before the
leader restart needs to
+ // be discarded.
+ if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
+ return deprecatedResp(MSG_NODE_RESTART_INDEX_STALE, tCommitId);
+ }
+ // Similarly, check pipeTask restartTimes
+ if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+ && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
+ return deprecatedResp(MSG_PIPE_RESTART_INDEX_STALE, tCommitId);
+ }
+ // Similarly, check replicationIndex
+ if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+ && tCommitId.getPipeTaskRestartTimes() == pipeTaskRestartTimes
+ && tCommitId.getReplicateIndex() < onSyncedReplicateIndex + 1) {
+ return deprecatedResp(MSG_STALE_REPLICATE_INDEX, tCommitId);
+ }
+ // pass check
+ return null;
+ }
+
private TPipeConsensusTransferResp onRequest(
final TPipeConsensusTransferReq req,
final boolean isTransferTsFilePiece,
@@ -1442,27 +1474,15 @@ public class PipeConsensusReceiver {
TCommitId tCommitId = req.getCommitId();
RequestMeta requestMeta = new RequestMeta(tCommitId);
+ TPipeConsensusTransferResp preCheckRes = preCheck(tCommitId);
+ if (preCheckRes != null) {
+ return preCheckRes;
+ }
+
LOGGER.info(
"PipeConsensus-PipeName-{}: start to receive no.{} event",
consensusPipeName,
tCommitId);
- // if a req is deprecated, we will discard it
- // This case may happen in this scenario: leader has transferred {1,2}
and is intending to
- // transfer {3, 4, 5, 6}. And in one moment, follower has received {4,
5, 6}, {3} is still
- // transferring due to some network latency.
- // At this time, leader restarts, and it will resend {3, 4, 5, 6} with
incremental
- // rebootTimes. If the {3} sent before the leader restart arrives
after the follower
- // receives
- // the request with incremental rebootTimes, the {3} sent before the
leader restart needs to
- // be discarded.
- if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
- return deprecatedResp(THIS_NODE);
- }
- // Similarly, check pipeTask restartTimes
- if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
- && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
- return deprecatedResp(PIPE_TASK);
- }
// Judge whether connector has rebooted or not, if the rebootTimes
increases compared to
// connectorRebootTimes, need to reset receiver because connector has
been restarted.
if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
@@ -1560,31 +1580,52 @@ public class PipeConsensusReceiver {
// pipeTaskStartTimes or rebootTimes came in and refreshed the
requestBuffer. In that
// cases we need to discard these requests.
if (!reqExecutionOrderBuffer.contains(requestMeta)) {
- return deprecatedResp(String.format("%s or %s", THIS_NODE,
PIPE_TASK));
+ return deprecatedResp(
+ String.format(
+ "%s or %s", MSG_NODE_RESTART_INDEX_STALE,
MSG_PIPE_RESTART_INDEX_STALE),
+ tCommitId);
}
- // If the buffer is not full after waiting timeout, we suppose
that the sender will
- // not send any more events at this time, that is, the sender
has sent all events. At
- // this point we apply the event at reqBuffer's peek
- if (timeout
- && reqExecutionOrderBuffer.size() <
IOTDB_CONFIG.getIotConsensusV2PipelineSize()
- && reqExecutionOrderBuffer.first() != null
- && reqExecutionOrderBuffer.first().equals(requestMeta)) {
- // TODO: Turn it to debug after GA
- LOGGER.info(
- "PipeConsensus-PipeName-{}: no.{} event get executed after
awaiting timeout, current receiver syncIndex: {}",
- consensusPipeName,
- tCommitId,
- onSyncedReplicateIndex);
- long startApplyNanos = System.nanoTime();
- metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
- requestMeta.setStartApplyNanos(startApplyNanos);
- TPipeConsensusTransferResp resp = loadEvent(req);
-
- if (resp != null
- && resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- onSuccess(tCommitId, isTransferTsFileSeal);
+ // After waiting timeout, we suppose that the sender will not
send any more events at
+ // this time, that is, the sender has sent all events. At this
point we apply the
+ // event at reqBuffer's peek
+ if (timeout && reqExecutionOrderBuffer.first() != null) {
+ // if current event is the first event in reqBuffer, we can
process it.
+ if (reqExecutionOrderBuffer.first().equals(requestMeta)) {
+ // TODO: Turn it to debug after GA
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: no.{} event get executed
after awaiting timeout, current receiver syncIndex: {}",
+ consensusPipeName,
+ tCommitId,
+ onSyncedReplicateIndex);
+ long startApplyNanos = System.nanoTime();
+ metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
+ requestMeta.setStartApplyNanos(startApplyNanos);
+ TPipeConsensusTransferResp resp = loadEvent(req);
+
+ if (resp != null
+ && resp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ onSuccess(tCommitId, isTransferTsFileSeal);
+ }
+ return resp;
+ }
+ // if current event is not the first event in reqBuffer, we
should return an error
+ // code to let leader retry or proceed instead of getting
stuck in this while loop
+ // and block sender.
+ else {
+ final TSStatus status =
+ new TSStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_CONSENSUS_WAIT_ORDER_TIMEOUT,
+ "Waiting for the previous event times out,
returns an error to let the sender retry and continue scheduling."));
+ // TODO: Turn it to debug after GA
+ LOGGER.info(
+ "PipeConsensus-{}: Waiting for the previous event times
out, current peek {}, current id {}",
+ consensusPipeName,
+ reqExecutionOrderBuffer.first().commitId,
+ tCommitId);
+ return new TPipeConsensusTransferResp(status);
}
- return resp;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -1685,17 +1726,18 @@ public class PipeConsensusReceiver {
}
}
- private TPipeConsensusTransferResp deprecatedResp(String msg) {
+ private TPipeConsensusTransferResp deprecatedResp(String msg, TCommitId
tCommitId) {
final TSStatus status =
new TSStatus(
RpcUtils.getStatus(
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
String.format(
- "PipeConsensus receiver received a deprecated request,
which may be sent before %s restarts. Consider to discard it",
+ "PipeConsensus receiver received a deprecated request,
which may because %s. Consider to discard it.",
msg)));
LOGGER.info(
- "PipeConsensus-PipeName-{}: received a deprecated request, which may
be sent before {} restarts. Consider to discard it",
+ "PipeConsensus-PipeName-{}: received a deprecated request-{}, which
may because {}. ",
consensusPipeName,
+ tCommitId,
msg);
return new TPipeConsensusTransferResp(status);
}
@@ -1713,6 +1755,10 @@ public class PipeConsensusReceiver {
return commitId.getDataNodeRebootTimes();
}
+ public int getPipeTaskRestartTimes() {
+ return commitId.getPipeTaskRestartTimes();
+ }
+
public long getReplicateIndex() {
return commitId.getReplicateIndex();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/IoTV2GlobalComponentContainer.java
similarity index 67%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/IoTV2GlobalComponentContainer.java
index 6ba0136280d..f6f66b9c9ef 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/IoTV2GlobalComponentContainer.java
@@ -26,24 +26,34 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
- * This class is used to hold the syncClientManager and asyncClientManager
used by pipeConsensus.
- * The purpose of designing this class is that both the consensus layer and
the datanode layer of
- * pipeConsensus use clientManager.
+ * This class is used to hold the global component such as syncClientManager
and asyncClientManager
+ * used by pipeConsensus. The purpose of designing this class is that both the
consensus layer and
+ * the datanode layer of pipeConsensus use clientManager.
*
* <p>Note: we hope to create the corresponding clientManager only when the
consensus is
* pipeConsensus to avoid unnecessary overhead.
*/
-public class PipeConsensusClientMgrContainer {
+public class IoTV2GlobalComponentContainer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTV2GlobalComponentContainer.class);
private static final CommonConfig CONF =
CommonDescriptor.getInstance().getConfig();
private final PipeConsensusClientProperty config;
private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncClientManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
+ private final ScheduledExecutorService backgroundTaskService;
- private PipeConsensusClientMgrContainer() {
+ private IoTV2GlobalComponentContainer() {
// load rpc client config
this.config =
PipeConsensusClientProperty.newBuilder()
@@ -57,6 +67,9 @@ public class PipeConsensusClientMgrContainer {
this.syncClientManager =
new IClientManager.Factory<TEndPoint, SyncPipeConsensusServiceClient>()
.createClientManager(new
SyncPipeConsensusServiceClientPoolFactory(config));
+ this.backgroundTaskService =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_CONSENSUS_BACKGROUND_TASK_EXECUTOR.getName());
}
public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
getGlobalAsyncClientManager() {
@@ -67,19 +80,35 @@ public class PipeConsensusClientMgrContainer {
return this.syncClientManager;
}
+ public ScheduledExecutorService getBackgroundTaskService() {
+ return this.backgroundTaskService;
+ }
+
+ public void stopBackgroundTaskService() {
+ backgroundTaskService.shutdownNow();
+ try {
+ if (!backgroundTaskService.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOGGER.warn("IoTV2 background service did not terminate within {}s",
30);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("IoTV2 background Thread still doesn't exit after 30s");
+ Thread.currentThread().interrupt();
+ }
+ }
+
private static class PipeConsensusClientMgrContainerHolder {
- private static PipeConsensusClientMgrContainer INSTANCE;
+ private static IoTV2GlobalComponentContainer INSTANCE;
private PipeConsensusClientMgrContainerHolder() {}
public static void build() {
if (INSTANCE == null) {
- INSTANCE = new PipeConsensusClientMgrContainer();
+ INSTANCE = new IoTV2GlobalComponentContainer();
}
}
}
- public static PipeConsensusClientMgrContainer getInstance() {
+ public static IoTV2GlobalComponentContainer getInstance() {
return PipeConsensusClientMgrContainerHolder.INSTANCE;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e208f0eb87a..076194dc3ae 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -106,6 +106,7 @@ public enum ThreadName {
ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL("AsyncDataNodePipeConsensusServiceClientPool"),
PIPE_CONSENSUS_DELETION_SERIALIZE("DAL-Serialize"),
PIPE_CONSENSUS_TSFILE_WRITER_CHECKER("PipeConsensus-TsFileWriter-Checker"),
+
PIPE_CONSENSUS_BACKGROUND_TASK_EXECUTOR("PipeConsensusBackgroundTaskExecutor"),
// -------------------------- IoTConsensus --------------------------
IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 6b4ae19464a..a64dd760688 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -64,6 +64,8 @@ public abstract class EnrichedEvent implements Event {
// Used in IoTConsensusV2
protected long replicateIndexForIoTV2 = NO_COMMIT_ID;
protected int rebootTimes = 0;
+ public static final long INITIAL_RETRY_INTERVAL_FOR_IOTV2 = 500L;
+ protected long retryInterval = INITIAL_RETRY_INTERVAL_FOR_IOTV2;
protected final TreePattern treePattern;
protected final TablePattern tablePattern;
@@ -429,6 +431,14 @@ public abstract class EnrichedEvent implements Event {
return rebootTimes;
}
+ public long getRetryInterval() {
+ return this.retryInterval;
+ }
+
+ public long setRetryInterval(final long retryInterval) {
+ return retryInterval;
+ }
+
public CommitterKey getCommitterKey() {
return committerKey;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 83665780f77..779956b21d0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -102,6 +102,11 @@ public class PipeReceiverStatusHandler {
exceptionMessage, Integer.MAX_VALUE);
}
+ if (RetryUtils.notNeedRetryForConsensus(status.getCode())) {
+ LOGGER.info("IoTConsensusV2: will not retry. status: {}", status);
+ return;
+ }
+
switch (status.getCode()) {
case 200: // SUCCESS_STATUS
case 400: // REDIRECTION_RECOMMEND
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index c0ea52e6b74..2f3b6715686 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.commons.utils;
+import
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.rpc.TSStatusCode;
+import java.net.ConnectException;
+
public class RetryUtils {
public interface CallableWithException<T, E extends Exception> {
@@ -33,6 +36,15 @@ public class RetryUtils {
|| statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
}
+ public static boolean needRetryWithIncreasingInterval(Exception e) {
+ return e instanceof ConnectException
+ || e instanceof PipeConsensusRetryWithIncreasingIntervalException;
+ }
+
+ public static boolean notNeedRetryForConsensus(int statusCode) {
+ return statusCode ==
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST.getStatusCode();
+ }
+
public static final int MAX_RETRIES = 3;
public static <T, E extends Exception> T retryOnException(