This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c78546d27ef IoTV2: Receiver aware of pipe restart & Improve condition
schedule (#15314)
c78546d27ef is described below
commit c78546d27ef185f82d8d9d8cb6210183fc6812cd
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 11 21:01:05 2025 +0800
IoTV2: Receiver aware of pipe restart & Improve condition schedule (#15314)
* receiver aware of pipe restart
* improve condition schedule
* fix comment
* improve log
---
.../pipeconsensus/PipeConsensusAsyncConnector.java | 2 +
.../pipeconsensus/PipeConsensusSyncConnector.java | 3 +
.../PipeConsensusTransferBatchReqBuilder.java | 1 +
.../pipeconsensus/PipeConsensusReceiver.java | 87 +++++++++++++++-------
.../src/main/thrift/pipeconsensus.thrift | 3 +-
5 files changed, 67 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index a44c30fca63..1fdb12df8d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -276,6 +276,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
tCommitId =
new TCommitId(
pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
pipeInsertNodeTabletInsertionEvent.getRebootTimes());
// We increase the reference count for this event to determine if the
event may be released.
@@ -354,6 +355,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
TCommitId tCommitId =
new TCommitId(
pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(),
+ pipeTsFileInsertionEvent.getCommitterKey().getRestartTimes(),
pipeTsFileInsertionEvent.getRebootTimes());
TConsensusGroupId tConsensusGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index c92f99fb3ac..5365666a7a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -248,6 +248,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
final TCommitId tCommitId =
new TCommitId(
pipeDeleteDataNodeEvent.getReplicateIndexForIoTV2(),
+ pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(),
pipeDeleteDataNodeEvent.getRebootTimes());
final TConsensusGroupId tConsensusGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
@@ -317,6 +318,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
final TCommitId tCommitId =
new TCommitId(
pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
pipeInsertNodeTabletInsertionEvent.getRebootTimes());
final TConsensusGroupId tConsensusGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
@@ -376,6 +378,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
final TCommitId tCommitId =
new TCommitId(
pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(),
+ pipeTsFileInsertionEvent.getCommitterKey().getRestartTimes(),
pipeTsFileInsertionEvent.getRebootTimes());
final TConsensusGroupId tConsensusGroupId =
new TConsensusGroupId(TConsensusGroupType.DataRegion,
consensusGroupId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index e86193dad17..f39613cf2c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -183,6 +183,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
commitId =
new TCommitId(
pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
pipeInsertNodeTabletInsertionEvent.getRebootTimes());
// Read the bytebuffer from the wal file and transfer it directly without
serializing or
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 10572e0a881..a996b6b9c21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1325,6 +1325,9 @@ public class PipeConsensusReceiver {
* although events can arrive receiver in a random sequence.
*/
private class RequestExecutor {
+ private static final String THIS_NODE = "this node";
+ private static final String PIPE_TASK = "pipe task";
+
// An ordered set that buffers transfer requests' TCommitId, whose length
is not larger than
// PIPE_CONSENSUS_PIPELINE_SIZE.
// Here we use set is to avoid duplicate events being received in some
special cases
@@ -1337,6 +1340,7 @@ public class PipeConsensusReceiver {
private final AtomicInteger tsFileEventCount = new AtomicInteger(0);
private volatile long onSyncedReplicateIndex = 0;
private volatile int connectorRebootTimes = 0;
+ private volatile int pipeTaskRestartTimes = 0;
public RequestExecutor(
PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool
tsFileWriterPool) {
@@ -1415,20 +1419,21 @@ public class PipeConsensusReceiver {
// the request with incremental rebootTimes, the {3} sent before the
leader restart needs to
// be discarded.
if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
- final TSStatus status =
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
- "PipeConsensus receiver received a deprecated request,
which may be sent before the connector restart. Consider to discard it"));
- LOGGER.info(
- "PipeConsensus-PipeName-{}: received a deprecated request, which
may be sent before the connector restart. Consider to discard it",
- consensusPipeName);
- return new TPipeConsensusTransferResp(status);
+ return deprecatedResp(THIS_NODE);
+ }
+ // Similarly, check pipeTask restartTimes
+ if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+ && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
+ return deprecatedResp(PIPE_TASK);
}
// Judge whether connector has rebooted or not, if the rebootTimes
increases compared to
// connectorRebootTimes, need to reset receiver because connector has
been restarted.
if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
- resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes(),
condition);
+ resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes());
+ }
+ // Similarly, check pipeTask restartTimes
+ if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) {
+ resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes());
}
// update metric
if (isTransferTsFilePiece &&
!reqExecutionOrderBuffer.contains(requestMeta)) {
@@ -1476,14 +1481,19 @@ public class PipeConsensusReceiver {
if (resp != null
&& resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
onSuccess(tCommitId, isTransferTsFileSeal);
- // signal all other reqs to accelerate dispatch process.
- condition.signalAll();
}
return resp;
}
if (reqExecutionOrderBuffer.size() >=
IOTDB_CONFIG.getIotConsensusV2PipelineSize()
&& reqExecutionOrderBuffer.first().equals(requestMeta)) {
+ // TODO: Turn it to debug after GA
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: no.{} event get executed because
receiver buffer's len >= pipeline, current receiver syncIndex {}, current
buffer len {}",
+ consensusPipeName,
+ tCommitId,
+ onSyncedReplicateIndex,
+ reqExecutionOrderBuffer.size());
long startApplyNanos = System.nanoTime();
metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
requestMeta.setStartApplyNanos(startApplyNanos);
@@ -1493,8 +1503,6 @@ public class PipeConsensusReceiver {
if (resp != null
&& resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
onSuccess(tCommitId, isTransferTsFileSeal);
- // signal all other reqs that may wait for this event
- condition.signalAll();
}
return resp;
} else {
@@ -1511,15 +1519,7 @@ public class PipeConsensusReceiver {
// pipeTaskStartTimes or rebootTimes came in and refreshed the
requestBuffer. In that
// cases we need to discard these requests.
if (!reqExecutionOrderBuffer.contains(requestMeta)) {
- final TSStatus status =
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
- "PipeConsensus receiver received a deprecated
request, which may be sent before the connector restart or pipe task restart.
Consider to discard it"));
- LOGGER.info(
- "PipeConsensus-PipeName-{}: received a deprecated request,
which may be sent before the connector restart or pipe task restart. Consider
to discard it",
- consensusPipeName);
- return new TPipeConsensusTransferResp(status);
+ return deprecatedResp(String.format("%s or %s", THIS_NODE,
PIPE_TASK));
}
// If the buffer is not full after waiting timeout, we suppose
that the sender will
// not send any more events at this time, that is, the sender
has sent all events. At
@@ -1528,6 +1528,12 @@ public class PipeConsensusReceiver {
&& reqExecutionOrderBuffer.size() <
IOTDB_CONFIG.getIotConsensusV2PipelineSize()
&& reqExecutionOrderBuffer.first() != null
&& reqExecutionOrderBuffer.first().equals(requestMeta)) {
+ // TODO: Turn it to debug after GA
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: no.{} event get executed after
awaiting timeout, current receiver syncIndex: {}",
+ consensusPipeName,
+ tCommitId,
+ onSyncedReplicateIndex);
long startApplyNanos = System.nanoTime();
metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
requestMeta.setStartApplyNanos(startApplyNanos);
@@ -1536,8 +1542,6 @@ public class PipeConsensusReceiver {
if (resp != null
&& resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
onSuccess(tCommitId, isTransferTsFileSeal);
- // signal all other reqs that may wait for this event
- condition.signalAll();
}
return resp;
}
@@ -1557,6 +1561,9 @@ public class PipeConsensusReceiver {
}
}
} finally {
+ // let all threads that may still await become active again to acquire
lock instead of
+ // meaningless sleeping in the condition while lock is already
released.
+ condition.signalAll();
lock.unlock();
}
}
@@ -1565,17 +1572,26 @@ public class PipeConsensusReceiver {
* Reset all data to initial status and set connectorRebootTimes properly.
This method is called
* when receiver identifies connector has rebooted.
*/
- private void resetWithNewestRebootTime(int connectorRebootTimes, Condition
condition) {
+ private void resetWithNewestRebootTime(int connectorRebootTimes) {
LOGGER.info(
"PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes,
which indicates the leader has rebooted. receiver will reset all its data.",
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after
dataNode reboots, it's
// safe to clear all events in buffer.
clear();
- // signal all deprecated requests that may wait on condition to expire
them
- condition.signalAll();
// sync the follower's connectorRebootTimes with connector's actual
rebootTimes.
this.connectorRebootTimes = connectorRebootTimes;
+ this.pipeTaskRestartTimes = 0;
+ }
+
+ private void resetWithNewestRestartTime(int pipeTaskRestartTimes) {
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: receiver detected an newer
pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver
will reset all its data.",
+ consensusPipeName);
+ // since pipe task will resend all data that hasn't synchronized after
restarts, it's safe to
+ // clear all events in buffer.
+ clear();
+ this.pipeTaskRestartTimes = pipeTaskRestartTimes;
}
private void clear() {
@@ -1583,6 +1599,21 @@ public class PipeConsensusReceiver {
this.tsFileWriterPool.handleExit(consensusPipeName);
this.onSyncedReplicateIndex = 0;
}
+
+ private TPipeConsensusTransferResp deprecatedResp(String msg) {
+ final TSStatus status =
+ new TSStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
+ String.format(
+ "PipeConsensus receiver received a deprecated request,
which may be sent before %s restarts. Consider to discard it",
+ msg)));
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: received a deprecated request, which may
be sent before {} restarts. Consider to discard it",
+ consensusPipeName,
+ msg);
+ return new TPipeConsensusTransferResp(status);
+ }
}
private static class RequestMeta {
diff --git
a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
index 9bae689e692..56ef64a606b 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift
@@ -22,7 +22,8 @@ namespace java org.apache.iotdb.consensus.pipe.thrift
struct TCommitId {
1:required i64 replicateIndex
- 2:required i32 dataNodeRebootTimes
+ 2:required i32 pipeTaskRestartTimes
+ 3:required i32 dataNodeRebootTimes
}
struct TPipeConsensusTransferReq {