This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3fd0c4c50fe2b67c9d2ca046b4194d56943dc8d3 Author: Peng Junzhi <[email protected]> AuthorDate: Mon Jul 22 21:02:16 2024 -0500 Pipe/PipeConsensus: Fix invalid retry count in report & enhance log in pipe consensus (#12989) (cherry picked from commit f5f0a3401f8898a8ba6085bb3b1834eafa4bb0bd) --- .../protocol/pipeconsensus/PipeConsensusReceiver.java | 13 ++++++++++--- .../commons/pipe/task/subtask/PipeReportableSubtask.java | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index ce289ec341a..2480ac702be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -200,7 +200,10 @@ public class PipeConsensusReceiver { PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId); if (impl == null) { - String message = String.format("PipeConsensus: unexpected consensusGroupId %s", groupId); + String message = + String.format( + "PipeConsensus-PipeName-%s: unexpected consensusGroupId %s", + consensusPipeName, groupId); if (LOGGER.isErrorEnabled()) { LOGGER.error(message); } @@ -210,7 +213,8 @@ public class PipeConsensusReceiver { if (impl.isReadOnly()) { String message = String.format( - "PipeConsensus-PipeName-%s: fail to receive because system is read-only.", groupId); + "PipeConsensus-PipeName-%s: fail to receive because system is read-only.", + consensusPipeName); if (LOGGER.isErrorEnabled()) { LOGGER.error(message); } @@ -221,7 +225,7 @@ public class PipeConsensusReceiver { String message = String.format( "PipeConsensus-PipeName-%s: fail to receive because peer is inactive and not ready.", - groupId); + consensusPipeName); if (LOGGER.isWarnEnabled()) { LOGGER.warn(message); } @@ -1053,6 +1057,7 @@ public class PipeConsensusReceiver { diskBuffer.get().setUsed(true); diskBuffer.get().setCommitIdOfCorrespondingHolderEvent(commitId); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.warn( "PipeConsensus: receiver thread get interrupted when waiting for borrowing tsFileWriter."); } finally { @@ -1074,6 +1079,7 @@ public class PipeConsensusReceiver { try { Thread.sleep(RETRY_WAIT_TIME); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.warn( "PipeConsensus-PipeName-{}: receiver thread get interrupted when exiting.", consensusPipeName.toString()); @@ -1373,6 +1379,7 @@ public class PipeConsensusReceiver { return resp; } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.warn( "PipeConsensus-PipeName-{}: current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ", consensusPipeName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java index 462644db4df..c366b4d62e4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java @@ -73,7 +73,7 @@ public abstract class PipeReportableSubtask extends PipeSubtask { } retryCount.incrementAndGet(); - if (retryCount.get() <= MAX_RETRY_TIMES) { + if (retryCount.get() <= maxRetryTimes) { LOGGER.warn( "Retry executing subtask {} (creation time: {}, simple class: {}), retry count [{}/{}], last exception: {}", taskID,
