This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7649d990c IoTV2: Improve receiver closed and clean dir logic (#15313)
9d7649d990c is described below
commit 9d7649d990c03a925d6b7917c451ab2fe7e6fdc2
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Apr 14 15:39:17 2025 +0800
IoTV2: Improve receiver closed and clean dir logic (#15313)
* refine close for receiver
* complete
* improve
* reformat
* reformat
* spotless
* Fix review
* Fix review
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 12 +++
.../pipeconsensus/PipeConsensusReceiver.java | 86 +++++++++++-----------
.../pipeconsensus/PipeConsensusReceiverAgent.java | 36 ++++++++-
.../commons/pipe/agent/task/PipeTaskAgent.java | 2 +-
4 files changed, 92 insertions(+), 44 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0af0be13515..7e2191f8293 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -341,6 +341,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return true;
}
+ @Override
+ protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws
IllegalPathException {
+ String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
+ if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+ // Release corresponding receiver's resource
+ PipeDataNodeAgent.receiver()
+ .pipeConsensus()
+ .markConsensusPipeAsCreated(new ConsensusPipeName(pipeName));
+ }
+ return super.createPipe(pipeMetaFromCoordinator);
+ }
+
@Override
protected boolean dropPipe(final String pipeName) {
// Get the pipe meta first because it is removed after
super#dropPipe(pipeName)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index f81cbdfe88f..5f353a61445 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1072,8 +1072,8 @@ public class PipeConsensusReceiver {
public synchronized void handleExit() {
// only after closing request executor, can we clean receiver.
requestExecutor.tryClose();
- // Clear the tsFileWriters and receiver base dirs
- pipeConsensusTsFileWriterPool.handleExit(consensusPipeName);
+ // Clear the tsFileWriters, receiverBuffer and receiver base dirs
+ requestExecutor.clear(false);
clearAllReceiverBaseDir();
// remove metric
MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
@@ -1425,33 +1425,6 @@ public class PipeConsensusReceiver {
this.tsFileWriterPool = tsFileWriterPool;
}
- private void onSuccess(TCommitId commitId, boolean isTransferTsFileSeal) {
- LOGGER.info(
- "PipeConsensus-PipeName-{}: process no.{} event successfully!",
- consensusPipeName,
- commitId);
- RequestMeta curMeta = reqExecutionOrderBuffer.pollFirst();
- onSyncedReplicateIndex = commitId.getReplicateIndex();
- // update metric, notice that curMeta is never null.
- if (isTransferTsFileSeal) {
- tsFileEventCount.decrementAndGet();
- metric.recordReceiveTsFileTimer(System.nanoTime() -
curMeta.getStartApplyNanos());
- } else {
- WALEventCount.decrementAndGet();
- metric.recordReceiveWALTimer(System.nanoTime() -
curMeta.getStartApplyNanos());
- }
- }
-
- private void tryClose() {
- // It will not be closed until all requests sent before closing are done.
- lock.lock();
- try {
- isClosed.set(true);
- } finally {
- lock.unlock();
- }
- }
-
private TPipeConsensusTransferResp onRequest(
final TPipeConsensusTransferReq req,
final boolean isTransferTsFilePiece,
@@ -1459,16 +1432,10 @@ public class PipeConsensusReceiver {
long startAcquireLockNanos = System.nanoTime();
lock.lock();
try {
+ // once thread gets lock, it will judge whether receiver is closed
if (isClosed.get()) {
- final TSStatus status =
- new TSStatus(
- RpcUtils.getStatus(
- TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
- "PipeConsensus receiver received a request after it was
closed."));
- LOGGER.info(
- "PipeConsensus-PipeName-{}: received a request after receiver
was closed and pipe task was dropped.",
- consensusPipeName);
- return new TPipeConsensusTransferResp(status);
+ return PipeConsensusReceiverAgent.closedResp(
+ consensusPipeName.toString(), req.getCommitId());
}
long startDispatchNanos = System.nanoTime();
@@ -1585,6 +1552,12 @@ public class PipeConsensusReceiver {
!condition.await(
PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS,
TimeUnit.MILLISECONDS);
+ // once thread gets lock, it will judge whether receiver is
closed
+ if (isClosed.get()) {
+ return PipeConsensusReceiverAgent.closedResp(
+ consensusPipeName.toString(), req.getCommitId());
+ }
+
// If some reqs find the buffer no longer contains their
requestMeta after jumping out
// from condition.await, it may indicate that during their wait,
some reqs with newer
// pipeTaskStartTimes or rebootTimes came in and refreshed the
requestBuffer. In that
@@ -1649,7 +1622,7 @@ public class PipeConsensusReceiver {
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after
dataNode reboots, it's
// safe to clear all events in buffer.
- clear();
+ clear(true);
// sync the follower's connectorRebootTimes with connector's actual
rebootTimes.
this.connectorRebootTimes = connectorRebootTimes;
this.pipeTaskRestartTimes = 0;
@@ -1661,14 +1634,43 @@ public class PipeConsensusReceiver {
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after
restarts, it's safe to
// clear all events in buffer.
- clear();
+ clear(false);
this.pipeTaskRestartTimes = pipeTaskRestartTimes;
}
- private void clear() {
+ private void onSuccess(TCommitId commitId, boolean isTransferTsFileSeal) {
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: process no.{} event successfully!",
+ consensusPipeName,
+ commitId);
+ RequestMeta curMeta = reqExecutionOrderBuffer.pollFirst();
+ onSyncedReplicateIndex = commitId.getReplicateIndex();
+ // update metric, notice that curMeta is never null.
+ if (isTransferTsFileSeal) {
+ tsFileEventCount.decrementAndGet();
+ metric.recordReceiveTsFileTimer(System.nanoTime() -
curMeta.getStartApplyNanos());
+ } else {
+ WALEventCount.decrementAndGet();
+ metric.recordReceiveWALTimer(System.nanoTime() -
curMeta.getStartApplyNanos());
+ }
+ }
+
+ private void clear(boolean resetSyncIndex) {
this.reqExecutionOrderBuffer.clear();
this.tsFileWriterPool.handleExit(consensusPipeName);
- this.onSyncedReplicateIndex = 0;
+ if (resetSyncIndex) {
+ this.onSyncedReplicateIndex = 0;
+ }
+ }
+
+ private void tryClose() {
+ // It will not be closed until all requests sent before closing are done.
+ lock.lock();
+ try {
+ isClosed.set(true);
+ } finally {
+ lock.unlock();
+ }
}
private TPipeConsensusTransferResp deprecatedResp(String msg) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index 524b93dda4c..6d3bef50a00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
+import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,7 +40,9 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -66,6 +69,8 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
ConsensusGroupId, Map<ConsensusPipeName,
AtomicReference<PipeConsensusReceiver>>>
replicaReceiverMap = new ConcurrentHashMap<>();
+ private final Set<ConsensusPipeName> createdConsensusPipes = new
CopyOnWriteArraySet<>();
+
private PipeConsensus pipeConsensus;
public PipeConsensusReceiverAgent() {
@@ -85,13 +90,32 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
}
}
+ public static TPipeConsensusTransferResp closedResp(String consensusInfo,
TCommitId tCommitId) {
+ final TSStatus status =
+ new TSStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
+ "PipeConsensus receiver received a request after it was
closed."));
+ LOGGER.info(
+ "PipeConsensus-{}: receive on-the-fly no.{} event after consensus pipe
was dropped, discard it",
+ consensusInfo,
+ tCommitId);
+ return new TPipeConsensusTransferResp(status);
+ }
+
@Override
public TPipeConsensusTransferResp receive(TPipeConsensusTransferReq req) {
final byte reqVersion = req.getVersion();
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
final ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- return getReceiver(consensusGroupId, req.getDataNodeId(),
reqVersion).receive(req);
+ final PipeConsensusReceiver receiver =
+ getReceiver(consensusGroupId, req.getDataNodeId(), reqVersion);
+
+ if (receiver == null) {
+ return closedResp(consensusGroupId.toString(), req.getCommitId());
+ }
+ return receiver.receive(req);
} else {
final TSStatus status =
RpcUtils.getStatus(
@@ -111,6 +135,11 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
// 2. Route to given consensusPipeTask's receiver
ConsensusPipeName consensusPipeName =
new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
+ // 3. Judge whether pipe task was dropped
+ if (!createdConsensusPipes.contains(consensusPipeName)) {
+ return null;
+ }
+
AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
AtomicReference<PipeConsensusReceiver> receiverReference =
consensusPipe2ReceiverMap.computeIfAbsent(
@@ -192,9 +221,14 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
consensusPipe2ReciverMap.getOrDefault(pipeName, null);
// 3. Release receiver
if (receiverReference != null) {
+ createdConsensusPipes.remove(pipeName);
receiverReference.get().handleExit();
receiverReference.set(null);
consensusPipe2ReciverMap.remove(pipeName);
}
}
+
+ public void markConsensusPipeAsCreated(ConsensusPipeName pipeName) {
+ createdConsensusPipes.add(pipeName);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 2e3b9d0a868..08bb1713d72 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -453,7 +453,7 @@ public abstract class PipeTaskAgent {
* if the pipe already exists or is created but should not be started
* @throws IllegalStateException if the status is illegal
*/
- private boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws
IllegalPathException {
+ protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws
IllegalPathException {
final String pipeName =
pipeMetaFromCoordinator.getStaticMeta().getPipeName();
final long creationTime =
pipeMetaFromCoordinator.getStaticMeta().getCreationTime();