This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 8657782e4c8 Pipe: Optimized logger for temporarily out of memory
exception & Do not stop pipe for "Waited for memory to parse TsFile" (#17542)
(#17728)
8657782e4c8 is described below
commit 8657782e4c8bbfea602156005ed80ece0f29cc62
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 20 18:06:17 2026 +0800
Pipe: Optimized logger for temporarily out of memory exception & Do not
stop pipe for "Waited for memory to parse TsFile" (#17542) (#17728)
---
.../subtask/processor/PipeProcessorSubtask.java | 8 ++++----
.../common/tsfile/PipeTsFileInsertionEvent.java | 21 +++++++++++++--------
2 files changed, 17 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index ca5a8d0f4db..193693c5a95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -191,15 +191,15 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
PipeLogger.log(
LOGGER::info,
- e,
- "Temporarily out of memory in pipe event processing, will wait for
the memory to release.");
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release. Message: %s",
+ e.getMessage());
return false;
} catch (final Exception e) {
if (ExceptionUtils.getRootCause(e) instanceof
PipeRuntimeOutOfMemoryCriticalException) {
PipeLogger.log(
LOGGER::info,
- e,
- "Temporarily out of memory in pipe event processing, will wait for
the memory to release.");
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release. Message: %s",
+ e.getMessage());
return false;
}
if (!isClosed.get()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 53f0b16826a..8ffbc9f2f9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -495,8 +496,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
callerName,
getTsFile(),
tabletEventCount,
- retryCount,
- e);
+ retryCount);
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"{}: failed to allocate memory for parsing TsFile {}, tablet
event no. {}, retry count is {}, will keep retrying.",
@@ -542,7 +542,11 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
"Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath())
: String.format(
"Parse TsFile %s error. Because: %s",
resource.getTsFilePath(), e.getMessage());
- LOGGER.warn(errorMsg, e);
+ if (e instanceof PipeRuntimeOutOfMemoryCriticalException) {
+ PipeLogger.log(LOGGER::warn, errorMsg);
+ } else {
+ PipeLogger.log(LOGGER::warn, e, errorMsg);
+ }
throw new PipeException(errorMsg);
}
}
@@ -566,28 +570,29 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent
final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
if (elapsedRecordTimeSeconds > 10.0) {
LOGGER.info(
- "Wait for resource enough for parsing {} for {} seconds.",
+ "Wait for memory enough for parsing {} for {} seconds.",
resource != null ? resource.getTsFilePath() : "tsfile",
waitTimeSeconds);
lastRecordTime = currentTime;
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "Wait for resource enough for parsing {} for {} seconds.",
+ "Wait for memory enough for parsing {} for {} seconds.",
resource != null ? resource.getTsFilePath() : "tsfile",
waitTimeSeconds);
}
if (waitTimeSeconds * 1000 > timeoutMs) {
// should contain 'TimeoutException' in exception message
- throw new PipeException(
- String.format("TimeoutException: Waited %s seconds",
waitTimeSeconds));
+ throw new PipeRuntimeOutOfMemoryCriticalException(
+ String.format(
+ "TimeoutException: Waited %s seconds for memory to parse
TsFile", waitTimeSeconds));
}
}
final long currentTime = System.currentTimeMillis();
final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
LOGGER.info(
- "Wait for resource enough for parsing {} for {} seconds.",
+ "Wait for memory enough for parsing {} for {} seconds.",
resource != null ? resource.getTsFilePath() : "tsfile",
waitTimeSeconds);
}