This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch cp-gr in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 66fd0c0549a71ae1c40b4c55444fbabb117bb7f6 Author: Caideyipi <[email protected]> AuthorDate: Fri Apr 24 11:15:04 2026 +0800 Pipe: Optimized logger for temporarily out of memory exception & Do not stop pipe for "Waited for memory to parse TsFile" (#17542) --- .../subtask/processor/PipeProcessorSubtask.java | 8 ++++---- .../common/tsfile/PipeTsFileInsertionEvent.java | 21 +++++++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index ca5a8d0f4db..193693c5a95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -191,15 +191,15 @@ public class PipeProcessorSubtask extends PipeReportableSubtask { } catch (final PipeRuntimeOutOfMemoryCriticalException e) { PipeLogger.log( LOGGER::info, - e, - "Temporarily out of memory in pipe event processing, will wait for the memory to release."); + "Temporarily out of memory in pipe event processing, will wait for the memory to release. Message: %s", + e.getMessage()); return false; } catch (final Exception e) { if (ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) { PipeLogger.log( LOGGER::info, - e, - "Temporarily out of memory in pipe event processing, will wait for the memory to release."); + "Temporarily out of memory in pipe event processing, will wait for the memory to release. Message: %s", + e.getMessage()); return false; } if (!isClosed.get()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 53f0b16826a..8ffbc9f2f9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -495,8 +496,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent callerName, getTsFile(), tabletEventCount, - retryCount, - e); + retryCount); } else if (LOGGER.isDebugEnabled()) { LOGGER.debug( "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", @@ -542,7 +542,11 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent "Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath()) : String.format( "Parse TsFile %s error. Because: %s", resource.getTsFilePath(), e.getMessage()); - LOGGER.warn(errorMsg, e); + if (e instanceof PipeRuntimeOutOfMemoryCriticalException) { + PipeLogger.log(LOGGER::warn, errorMsg); + } else { + PipeLogger.log(LOGGER::warn, e, errorMsg); + } throw new PipeException(errorMsg); } } @@ -566,28 +570,29 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent final double waitTimeSeconds = (currentTime - startTime) / 1000.0; if (elapsedRecordTimeSeconds > 10.0) { LOGGER.info( - "Wait for resource enough for parsing {} for {} seconds.", + "Wait for memory enough for parsing {} for {} seconds.", resource != null ? resource.getTsFilePath() : "tsfile", waitTimeSeconds); lastRecordTime = currentTime; } else if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "Wait for resource enough for parsing {} for {} seconds.", + "Wait for memory enough for parsing {} for {} seconds.", resource != null ? resource.getTsFilePath() : "tsfile", waitTimeSeconds); } if (waitTimeSeconds * 1000 > timeoutMs) { // should contain 'TimeoutException' in exception message - throw new PipeException( - String.format("TimeoutException: Waited %s seconds", waitTimeSeconds)); + throw new PipeRuntimeOutOfMemoryCriticalException( + String.format( + "TimeoutException: Waited %s seconds for memory to parse TsFile", waitTimeSeconds)); } } final long currentTime = System.currentTimeMillis(); final double waitTimeSeconds = (currentTime - startTime) / 1000.0; LOGGER.info( - "Wait for resource enough for parsing {} for {} seconds.", + "Wait for memory enough for parsing {} for {} seconds.", resource != null ? resource.getTsFilePath() : "tsfile", waitTimeSeconds); }
