This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb51c67c71 IoTConsensusV2: Fix reqBuffer reset and receive concurrent
bug (#14215)
0bb51c67c71 is described below
commit 0bb51c67c71b84e80f34b8232d8b5f1b2bbe229a
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Nov 28 10:02:24 2024 +0800
IoTConsensusV2: Fix reqBuffer reset and receive concurrent bug (#14215)
* fix reqBuffer reset and receive concurrent bug
* fix review
---
.../pipeconsensus/PipeConsensusReceiver.java | 27 ++++++++++++++++++----
1 file changed, 23 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 20164ff2293..32ead7587e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1391,11 +1391,11 @@ public class PipeConsensusReceiver {
// Judge whether connector has rebooted or not, if the rebootTimes
increases compared to
// connectorRebootTimes, need to reset receiver because connector has
been restarted.
if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
- resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes());
+ resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes(),
condition);
}
// Similarly, check pipeTask restartTimes
if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) {
- resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes());
+ resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes(),
condition);
}
// update metric
if (isTransferTsFilePiece &&
!reqExecutionOrderBuffer.contains(requestMeta)) {
@@ -1471,6 +1471,21 @@ public class PipeConsensusReceiver {
!condition.await(
PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS,
TimeUnit.MILLISECONDS);
+ // If some reqs find the buffer no longer contains their
requestMeta after jumping out
+ // from condition.await, it may indicate that during their wait,
some reqs with newer
+ // pipeTaskStartTimes or rebootTimes came in and refreshed the
requestBuffer. In that
+ // cases we need to discard these requests.
+ if (!reqExecutionOrderBuffer.contains(requestMeta)) {
+ final TSStatus status =
+ new TSStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
+ "PipeConsensus receiver received a deprecated
request, which may be sent before the connector restart or pipe task restart.
Consider to discard it"));
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: received a deprecated request,
which may be sent before the connector restart or pipe task restart. Consider
to discard it",
+ consensusPipeName);
+ return new TPipeConsensusTransferResp(status);
+ }
// If the buffer is not full after waiting timeout, we suppose
that the sender will
// not send any more events at this time, that is, the sender
has sent all events. At
// this point we apply the event at reqBuffer's peek
@@ -1515,26 +1530,30 @@ public class PipeConsensusReceiver {
* Reset all data to initial status and set connectorRebootTimes properly.
This method is called
* when receiver identifies connector has rebooted.
*/
- private void resetWithNewestRebootTime(int connectorRebootTimes) {
+ private void resetWithNewestRebootTime(int connectorRebootTimes, Condition
condition) {
LOGGER.info(
"PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes,
which indicates the leader has rebooted. receiver will reset all its data.",
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after
dataNode reboots, it's
// safe to clear all events in buffer.
clear();
+ // signal all deprecated requests that may wait on condition to expire
them
+ condition.signalAll();
// sync the follower's connectorRebootTimes with connector's actual
rebootTimes.
this.connectorRebootTimes = connectorRebootTimes;
// Note: dataNode rebooting will reset pipeTaskRestartTimes.
this.pipeTaskRestartTimes = 0;
}
- private void resetWithNewestRestartTime(int pipeTaskRestartTimes) {
+ private void resetWithNewestRestartTime(int pipeTaskRestartTimes,
Condition condition) {
LOGGER.info(
"PipeConsensus-PipeName-{}: receiver detected an newer
pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver
will reset all its data.",
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after
restarts, it's safe to
// clear all events in buffer.
clear();
+ // signal all deprecated requests that may wait on condition to expire
them
+ condition.signalAll();
this.pipeTaskRestartTimes = pipeTaskRestartTimes;
}