This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push: new c78546d27ef IoTV2: Receiver aware of pipe restart & Improve condition schedule (#15314) c78546d27ef is described below commit c78546d27ef185f82d8d9d8cb6210183fc6812cd Author: Peng Junzhi <78788603+peng...@users.noreply.github.com> AuthorDate: Fri Apr 11 21:01:05 2025 +0800 IoTV2: Receiver aware of pipe restart & Improve condition schedule (#15314) * receiver aware of pipe restart * improve condition schedule * fix comment * improve log --- .../pipeconsensus/PipeConsensusAsyncConnector.java | 2 + .../pipeconsensus/PipeConsensusSyncConnector.java | 3 + .../PipeConsensusTransferBatchReqBuilder.java | 1 + .../pipeconsensus/PipeConsensusReceiver.java | 87 +++++++++++++++------- .../src/main/thrift/pipeconsensus.thrift | 3 +- 5 files changed, 67 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index a44c30fca63..1fdb12df8d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -276,6 +276,7 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse tCommitId = new TCommitId( pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(), + pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(), pipeInsertNodeTabletInsertionEvent.getRebootTimes()); // We increase the reference count for this event to determine if the event may be released. @@ -354,6 +355,7 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse TCommitId tCommitId = new TCommitId( pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(), + pipeTsFileInsertionEvent.getCommitterKey().getRestartTimes(), pipeTsFileInsertionEvent.getRebootTimes()); TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java index c92f99fb3ac..5365666a7a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java @@ -248,6 +248,7 @@ public class PipeConsensusSyncConnector extends IoTDBConnector { final TCommitId tCommitId = new TCommitId( pipeDeleteDataNodeEvent.getReplicateIndexForIoTV2(), + pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(), pipeDeleteDataNodeEvent.getRebootTimes()); final TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId); @@ -317,6 +318,7 @@ public class PipeConsensusSyncConnector extends IoTDBConnector { final TCommitId tCommitId = new TCommitId( pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(), + pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(), pipeInsertNodeTabletInsertionEvent.getRebootTimes()); final TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId); @@ -376,6 +378,7 @@ public class PipeConsensusSyncConnector extends IoTDBConnector { final TCommitId tCommitId = new TCommitId( pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(), + pipeTsFileInsertionEvent.getCommitterKey().getRestartTimes(), pipeTsFileInsertionEvent.getRebootTimes()); final TConsensusGroupId tConsensusGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java index e86193dad17..f39613cf2c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java @@ -183,6 +183,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder implements AutoClosea commitId = new TCommitId( pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(), + pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(), pipeInsertNodeTabletInsertionEvent.getRebootTimes()); // Read the bytebuffer from the wal file and transfer it directly without serializing or diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 10572e0a881..a996b6b9c21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -1325,6 +1325,9 @@ public class PipeConsensusReceiver { * although events can arrive receiver in a random sequence. */ private class RequestExecutor { + private static final String THIS_NODE = "this node"; + private static final String PIPE_TASK = "pipe task"; + // An ordered set that buffers transfer requests' TCommitId, whose length is not larger than // PIPE_CONSENSUS_PIPELINE_SIZE. // Here we use set is to avoid duplicate events being received in some special cases @@ -1337,6 +1340,7 @@ public class PipeConsensusReceiver { private final AtomicInteger tsFileEventCount = new AtomicInteger(0); private volatile long onSyncedReplicateIndex = 0; private volatile int connectorRebootTimes = 0; + private volatile int pipeTaskRestartTimes = 0; public RequestExecutor( PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool tsFileWriterPool) { @@ -1415,20 +1419,21 @@ public class PipeConsensusReceiver { // the request with incremental rebootTimes, the {3} sent before the leader restart needs to // be discarded. if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) { - final TSStatus status = - new TSStatus( - RpcUtils.getStatus( - TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST, - "PipeConsensus receiver received a deprecated request, which may be sent before the connector restart. Consider to discard it")); - LOGGER.info( - "PipeConsensus-PipeName-{}: received a deprecated request, which may be sent before the connector restart. Consider to discard it", - consensusPipeName); - return new TPipeConsensusTransferResp(status); + return deprecatedResp(THIS_NODE); + } + // Similarly, check pipeTask restartTimes + if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes + && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) { + return deprecatedResp(PIPE_TASK); } // Judge whether connector has rebooted or not, if the rebootTimes increases compared to // connectorRebootTimes, need to reset receiver because connector has been restarted. if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) { - resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes(), condition); + resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes()); + } + // Similarly, check pipeTask restartTimes + if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) { + resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes()); } // update metric if (isTransferTsFilePiece && !reqExecutionOrderBuffer.contains(requestMeta)) { @@ -1476,14 +1481,19 @@ public class PipeConsensusReceiver { if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { onSuccess(tCommitId, isTransferTsFileSeal); - // signal all other reqs to accelerate dispatch process. - condition.signalAll(); } return resp; } if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize() && reqExecutionOrderBuffer.first().equals(requestMeta)) { + // TODO: Turn it to debug after GA + LOGGER.info( + "PipeConsensus-PipeName-{}: no.{} event get executed because receiver buffer's len >= pipeline, current receiver syncIndex {}, current buffer len {}", + consensusPipeName, + tCommitId, + onSyncedReplicateIndex, + reqExecutionOrderBuffer.size()); long startApplyNanos = System.nanoTime(); metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos); requestMeta.setStartApplyNanos(startApplyNanos); @@ -1493,8 +1503,6 @@ public class PipeConsensusReceiver { if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { onSuccess(tCommitId, isTransferTsFileSeal); - // signal all other reqs that may wait for this event - condition.signalAll(); } return resp; } else { @@ -1511,15 +1519,7 @@ public class PipeConsensusReceiver { // pipeTaskStartTimes or rebootTimes came in and refreshed the requestBuffer. In that // cases we need to discard these requests. if (!reqExecutionOrderBuffer.contains(requestMeta)) { - final TSStatus status = - new TSStatus( - RpcUtils.getStatus( - TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST, - "PipeConsensus receiver received a deprecated request, which may be sent before the connector restart or pipe task restart. Consider to discard it")); - LOGGER.info( - "PipeConsensus-PipeName-{}: received a deprecated request, which may be sent before the connector restart or pipe task restart. Consider to discard it", - consensusPipeName); - return new TPipeConsensusTransferResp(status); + return deprecatedResp(String.format("%s or %s", THIS_NODE, PIPE_TASK)); } // If the buffer is not full after waiting timeout, we suppose that the sender will // not send any more events at this time, that is, the sender has sent all events. At @@ -1528,6 +1528,12 @@ public class PipeConsensusReceiver { && reqExecutionOrderBuffer.size() < IOTDB_CONFIG.getIotConsensusV2PipelineSize() && reqExecutionOrderBuffer.first() != null && reqExecutionOrderBuffer.first().equals(requestMeta)) { + // TODO: Turn it to debug after GA + LOGGER.info( + "PipeConsensus-PipeName-{}: no.{} event get executed after awaiting timeout, current receiver syncIndex: {}", + consensusPipeName, + tCommitId, + onSyncedReplicateIndex); long startApplyNanos = System.nanoTime(); metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos); requestMeta.setStartApplyNanos(startApplyNanos); @@ -1536,8 +1542,6 @@ public class PipeConsensusReceiver { if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { onSuccess(tCommitId, isTransferTsFileSeal); - // signal all other reqs that may wait for this event - condition.signalAll(); } return resp; } @@ -1557,6 +1561,9 @@ public class PipeConsensusReceiver { } } } finally { + // let all threads that may still await become active again to acquire lock instead of + // meaningless sleeping in the condition while lock is already released. + condition.signalAll(); lock.unlock(); } } @@ -1565,17 +1572,26 @@ public class PipeConsensusReceiver { * Reset all data to initial status and set connectorRebootTimes properly. This method is called * when receiver identifies connector has rebooted. */ - private void resetWithNewestRebootTime(int connectorRebootTimes, Condition condition) { + private void resetWithNewestRebootTime(int connectorRebootTimes) { LOGGER.info( "PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, which indicates the leader has rebooted. receiver will reset all its data.", consensusPipeName); // since pipe task will resend all data that hasn't synchronized after dataNode reboots, it's // safe to clear all events in buffer. clear(); - // signal all deprecated requests that may wait on condition to expire them - condition.signalAll(); // sync the follower's connectorRebootTimes with connector's actual rebootTimes. this.connectorRebootTimes = connectorRebootTimes; + this.pipeTaskRestartTimes = 0; + } + + private void resetWithNewestRestartTime(int pipeTaskRestartTimes) { + LOGGER.info( + "PipeConsensus-PipeName-{}: receiver detected an newer pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver will reset all its data.", + consensusPipeName); + // since pipe task will resend all data that hasn't synchronized after restarts, it's safe to + // clear all events in buffer. + clear(); + this.pipeTaskRestartTimes = pipeTaskRestartTimes; } private void clear() { @@ -1583,6 +1599,21 @@ public class PipeConsensusReceiver { this.tsFileWriterPool.handleExit(consensusPipeName); this.onSyncedReplicateIndex = 0; } + + private TPipeConsensusTransferResp deprecatedResp(String msg) { + final TSStatus status = + new TSStatus( + RpcUtils.getStatus( + TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST, + String.format( + "PipeConsensus receiver received a deprecated request, which may be sent before %s restarts. Consider to discard it", + msg))); + LOGGER.info( + "PipeConsensus-PipeName-{}: received a deprecated request, which may be sent before {} restarts. Consider to discard it", + consensusPipeName, + msg); + return new TPipeConsensusTransferResp(status); + } } private static class RequestMeta { diff --git a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift index 9bae689e692..56ef64a606b 100644 --- a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift +++ b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift @@ -22,7 +22,8 @@ namespace java org.apache.iotdb.consensus.pipe.thrift struct TCommitId { 1:required i64 replicateIndex - 2:required i32 dataNodeRebootTimes + 2:required i32 pipeTaskRestartTimes + 3:required i32 dataNodeRebootTimes } struct TPipeConsensusTransferReq {