This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f5f0a3401f8 Pipe/PipeConsensus: Fix invalid retry count in report &
enhance log in pipe consensus (#12989)
f5f0a3401f8 is described below
commit f5f0a3401f8898a8ba6085bb3b1834eafa4bb0bd
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Jul 22 21:02:16 2024 -0500
Pipe/PipeConsensus: Fix invalid retry count in report & enhance log in pipe
consensus (#12989)
---
.../protocol/pipeconsensus/PipeConsensusReceiver.java | 13 ++++++++++---
.../commons/pipe/task/subtask/PipeReportableSubtask.java | 2 +-
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index ce289ec341a..2480ac702be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -200,7 +200,10 @@ public class PipeConsensusReceiver {
PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
if (impl == null) {
- String message = String.format("PipeConsensus: unexpected
consensusGroupId %s", groupId);
+ String message =
+ String.format(
+ "PipeConsensus-PipeName-%s: unexpected consensusGroupId %s",
+ consensusPipeName, groupId);
if (LOGGER.isErrorEnabled()) {
LOGGER.error(message);
}
@@ -210,7 +213,8 @@ public class PipeConsensusReceiver {
if (impl.isReadOnly()) {
String message =
String.format(
- "PipeConsensus-PipeName-%s: fail to receive because system is
read-only.", groupId);
+ "PipeConsensus-PipeName-%s: fail to receive because system is
read-only.",
+ consensusPipeName);
if (LOGGER.isErrorEnabled()) {
LOGGER.error(message);
}
@@ -221,7 +225,7 @@ public class PipeConsensusReceiver {
String message =
String.format(
"PipeConsensus-PipeName-%s: fail to receive because peer is
inactive and not ready.",
- groupId);
+ consensusPipeName);
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(message);
}
@@ -1053,6 +1057,7 @@ public class PipeConsensusReceiver {
diskBuffer.get().setUsed(true);
diskBuffer.get().setCommitIdOfCorrespondingHolderEvent(commitId);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
LOGGER.warn(
"PipeConsensus: receiver thread get interrupted when waiting for
borrowing tsFileWriter.");
} finally {
@@ -1074,6 +1079,7 @@ public class PipeConsensusReceiver {
try {
Thread.sleep(RETRY_WAIT_TIME);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
LOGGER.warn(
"PipeConsensus-PipeName-{}: receiver thread get
interrupted when exiting.",
consensusPipeName.toString());
@@ -1373,6 +1379,7 @@ public class PipeConsensusReceiver {
return resp;
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
LOGGER.warn(
"PipeConsensus-PipeName-{}: current waiting is interrupted.
onSyncedCommitIndex: {}. Exception: ",
consensusPipeName,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index 462644db4df..c366b4d62e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -73,7 +73,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
}
retryCount.incrementAndGet();
- if (retryCount.get() <= MAX_RETRY_TIMES) {
+ if (retryCount.get() <= maxRetryTimes) {
LOGGER.warn(
"Retry executing subtask {} (creation time: {}, simple class: {}),
retry count [{}/{}], last exception: {}",
taskID,