This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 77e87e494fc [To dev/1.3] Pipe: Reduced the progress index report
interval & Added some logs (#15905) (#15908)
77e87e494fc is described below
commit 77e87e494fcc20e45e180886c782f14cdf3bd499
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 10 19:24:46 2025 +0800
[To dev/1.3] Pipe: Reduced the progress index report interval & Added some
logs (#15905) (#15908)
* partial
* Changed default
* Update PipeDataNodeTaskAgent.java
* Next
---
.../pipe/agent/task/PipeConfigNodeTaskAgent.java | 20 ++++++------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 37 ++++++++++++----------
.../apache/iotdb/commons/conf/CommonConfig.java | 10 +++---
.../iotdb/commons/pipe/config/PipeConfig.java | 2 +-
.../iotdb/commons/pipe/config/PipeDescriptor.java | 2 +-
.../commons/pipe/resource/log/PipeLogManager.java | 2 +-
.../commons/pipe/resource/log/PipeLogStatus.java | 4 +--
7 files changed, 39 insertions(+), 38 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index b115ba25b4a..ae20141c81f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -205,21 +205,19 @@ public class PipeConfigNodeTaskAgent extends
PipeTaskAgent {
if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
return;
}
-
- LOGGER.info("Received pipe heartbeat request {} from config coordinator.",
req.heartbeatId);
+ final Optional<Logger> logger =
+ PipeConfigNodeResourceManager.log()
+ .schedule(
+ PipeConfigNodeTaskAgent.class,
+ PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeMetaKeeper.getPipeMetaCount());
+ LOGGER.debug("Received pipe heartbeat request {} from config
coordinator.", req.heartbeatId);
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
final List<Long> pipeRemainingEventCountList = new ArrayList<>();
final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
- final Optional<Logger> logger =
- PipeConfigNodeResourceManager.log()
- .schedule(
- PipeConfigNodeTaskAgent.class,
-
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
- pipeMetaKeeper.getPipeMetaCount());
-
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
@@ -242,7 +240,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
remainingEventCount,
estimatedRemainingTime));
}
- LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+ logger.ifPresent(l -> l.info("Reported {} pipe metas.",
pipeMetaBinaryList.size()));
} catch (final IOException e) {
throw new TException(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 711b305285a..5d21e2970de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -400,6 +400,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
if (PipeDataNodeAgent.runtime().isShutdown()) {
return;
}
+ final Optional<Logger> logger =
+ PipeDataNodeResourceManager.log()
+ .schedule(
+ PipeDataNodeTaskAgent.class,
+ PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeMetaKeeper.getPipeMetaCount());
final Set<Integer> dataRegionIds =
StorageEngine.getInstance().getAllDataRegionIds().stream()
@@ -411,13 +418,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final List<Long> pipeRemainingEventCountList = new ArrayList<>();
final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
- final Optional<Logger> logger =
- PipeDataNodeResourceManager.log()
- .schedule(
- PipeDataNodeTaskAgent.class,
-
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
- pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
@@ -464,7 +464,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
remainingEventAndTime.getLeft(),
remainingEventAndTime.getRight()));
}
- LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+ logger.ifPresent(l -> l.info("Reported {} pipe metas.",
pipeMetaBinaryList.size()));
} catch (final IOException | IllegalPathException e) {
throw new TException(e);
}
@@ -479,10 +479,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
protected void collectPipeMetaListInternal(
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws
TException {
// Do nothing if data node is removing or removed, or request does not
need pipe meta list
+ // If the heartbeatId == Long.MIN_VALUE then it's shutdown report and
shall not be skipped
if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId !=
Long.MIN_VALUE) {
return;
}
- LOGGER.info("Received pipe heartbeat request {} from config node.",
req.heartbeatId);
+ final Optional<Logger> logger =
+ PipeDataNodeResourceManager.log()
+ .schedule(
+ PipeDataNodeTaskAgent.class,
+ PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeMetaKeeper.getPipeMetaCount());
+ LOGGER.debug("Received pipe heartbeat request {} from config node.",
req.heartbeatId);
final Set<Integer> dataRegionIds =
StorageEngine.getInstance().getAllDataRegionIds().stream()
@@ -494,13 +502,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final List<Long> pipeRemainingEventCountList = new ArrayList<>();
final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
- final Optional<Logger> logger =
- PipeDataNodeResourceManager.log()
- .schedule(
- PipeDataNodeTaskAgent.class,
-
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
- pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
@@ -547,7 +548,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
remainingEventAndTime.getLeft(),
remainingEventAndTime.getRight()));
}
- LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+ logger.ifPresent(l -> l.info("Reported {} pipe metas.",
pipeMetaBinaryList.size()));
} catch (final IOException | IllegalPathException e) {
throw new TException(e);
}
@@ -839,6 +840,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
LOGGER.warn("Failed to persist progress index to configNode, status:
{}", result);
+ } else {
+ LOGGER.info("Successfully persisted all pipe's info to configNode.");
}
} catch (final Exception e) {
LOGGER.warn(e.getMessage());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 58765a9cddb..f7adde82e56 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -263,7 +263,7 @@ public class CommonConfig {
(int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
private boolean isSeperatedPipeHeartbeatEnabled = true;
- private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30;
+ private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
private long pipeMetaSyncerSyncIntervalMinutes = 3;
private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
@@ -294,8 +294,8 @@ public class CommonConfig {
private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
- private int pipeMetaReportMaxLogNumPerRound = 10;
- private int pipeMetaReportMaxLogIntervalRounds = 36;
+ private double pipeMetaReportMaxLogNumPerRound = 0.1;
+ private int pipeMetaReportMaxLogIntervalRounds = 360;
private int pipeTsFilePinMaxLogNumPerRound = 10;
private int pipeTsFilePinMaxLogIntervalRounds = 90;
private int pipeWalPinMaxLogNumPerRound = 10;
@@ -1698,11 +1698,11 @@ public class CommonConfig {
logger.info("pipeFlushAfterTerminateCount is set to {}",
pipeFlushAfterTerminateCount);
}
- public int getPipeMetaReportMaxLogNumPerRound() {
+ public double getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
- public void setPipeMetaReportMaxLogNumPerRound(int
pipeMetaReportMaxLogNumPerRound) {
+ public void setPipeMetaReportMaxLogNumPerRound(double
pipeMetaReportMaxLogNumPerRound) {
if (this.pipeMetaReportMaxLogNumPerRound ==
pipeMetaReportMaxLogNumPerRound) {
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index cb59931ba08..fdbda7fefd4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -393,7 +393,7 @@ public class PipeConfig {
/////////////////////////////// Logger ///////////////////////////////
- public int getPipeMetaReportMaxLogNumPerRound() {
+ public double getPipeMetaReportMaxLogNumPerRound() {
return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 6f04a652b9d..852beda7ed9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -128,7 +128,7 @@ public class PipeDescriptor {
Integer.toString(config.getPipeAirGapReceiverPort()))));
config.setPipeMetaReportMaxLogNumPerRound(
- Integer.parseInt(
+ Double.parseDouble(
properties.getProperty(
"pipe_meta_report_max_log_num_per_round",
String.valueOf(config.getPipeMetaReportMaxLogNumPerRound()))));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
index 49699fdf878..69d8b5294db 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
@@ -34,7 +34,7 @@ public class PipeLogManager {
public Optional<Logger> schedule(
final Class<?> logClass,
- final int maxAverageScale,
+ final double maxAverageScale,
final int maxLogInterval,
final int scale) {
return logClass2LogStatusMap
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
index 9348708281f..5427de9a831 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
@@ -29,11 +29,11 @@ class PipeLogStatus {
private final Logger logger;
- private final int maxAverageScale;
+ private final double maxAverageScale;
private final int maxLogInterval;
private final AtomicLong currentRounds = new AtomicLong(0);
- PipeLogStatus(final Class<?> logClass, final int maxAverageScale, final int
maxLogInterval) {
+ PipeLogStatus(final Class<?> logClass, final double maxAverageScale, final
int maxLogInterval) {
logger = LoggerFactory.getLogger(logClass);
this.maxAverageScale = maxAverageScale;