This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch print-pipe-error in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 14453dd5d701cc8671344cb04921ea93efb12eb6 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Jan 16 17:42:07 2024 +0800 Pipe: print more info about event and task when error occurs in pipe subtask --- .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 41 +++++++++++++++++++--- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 13 +++---- .../common/tablet/PipeRawTabletInsertionEvent.java | 11 ++++++ .../common/tsfile/PipeTsFileInsertionEvent.java | 19 +++++----- .../subtask/connector/PipeConnectorSubtask.java | 16 +++++---- .../subtask/processor/PipeProcessorSubtask.java | 14 ++++---- 6 files changed, 80 insertions(+), 34 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index 1f7b794327b..a2a98f78ad0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -41,16 +41,16 @@ public abstract class EnrichedEvent implements Event { private static final Logger LOGGER = LoggerFactory.getLogger(EnrichedEvent.class); - private final AtomicInteger referenceCount; + protected final AtomicInteger referenceCount; protected final String pipeName; protected final PipeTaskMeta pipeTaskMeta; - private String committerKey; + protected String committerKey; public static final long NO_COMMIT_ID = -1; - private long commitId = NO_COMMIT_ID; + protected long commitId = NO_COMMIT_ID; - private final String pattern; + protected final String pattern; protected final long startTime; protected final long endTime; @@ -58,7 +58,7 @@ public abstract class EnrichedEvent implements Event { protected boolean isPatternParsed; protected boolean isTimeParsed; - private boolean shouldReportOnCommit = false; + protected boolean shouldReportOnCommit = false; protected EnrichedEvent( String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime) { @@ -245,4 +245,35 @@ public abstract class EnrichedEvent implements Event { reportProgress(); } } + + @Override + public String toString() { + return "EnrichedEvent{" + + "referenceCount=" + + referenceCount.get() + + ", pipeName='" + + pipeName + + '\'' + + ", pipeTaskMeta=" + + pipeTaskMeta + + ", committerKey='" + + committerKey + + '\'' + + ", commitId=" + + commitId + + ", pattern='" + + pattern + + '\'' + + ", startTime=" + + startTime + + ", endTime=" + + endTime + + ", isPatternParsed=" + + isPatternParsed + + ", isTimeParsed=" + + isTimeParsed + + ", shouldReportOnCommit=" + + shouldReportOnCommit + + '}'; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 68d8049a81f..98ab1c7b2ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -249,13 +249,10 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent @Override public String toString() { - return "PipeInsertNodeTabletInsertionEvent{" - + "walEntryHandler=" - + walEntryHandler - + ", progressIndex=" - + progressIndex - + ", isAligned=" - + isAligned - + '}'; + return String.format( + "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s, progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainer=%s}", + walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, dataContainer) + + " - " + + super.toString(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 4e30a6b1f68..03df841181c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -208,4 +208,15 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet public boolean hasNoNeedParsingAndIsEmpty() { return !shouldParsePatternOrTime() && tablet.rowSize == 0; } + + /////////////////////////// Object /////////////////////////// + + @Override + public String toString() { + return String.format( + "PipeRawTabletInsertionEvent{tablet=%s, isAligned=%s, sourceEvent=%s, needToReport=%s, allocatedMemoryBlock=%s, dataContainer=%s}", + tablet, isAligned, sourceEvent, needToReport, allocatedMemoryBlock, dataContainer) + + " - " + + super.toString(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 473c922f15f..ee2f442e104 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -222,13 +222,16 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns @Override public String toString() { - return "PipeTsFileInsertionEvent{" - + "resource=" - + resource - + ", tsFile=" - + tsFile - + ", isClosed=" - + isClosed - + '}'; + return String.format( + "PipeTsFileInsertionEvent{isTsFileFormatValid=%s, resource=%s, tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s, dataContainer=%s}", + isTsFileFormatValid, + resource, + tsFile, + isLoaded, + isGeneratedByPipe, + isClosed.get(), + dataContainer) + + " - " + + super.toString(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index 2b8f1020a2c..184c706be1c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -150,18 +150,19 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask { if (!isClosed.get()) { throw e; } else { - LOGGER.info("PipeConnectionException in pipe transfer, ignored because pipe is dropped."); + LOGGER.info( + "PipeConnectionException in pipe transfer, ignored because pipe is dropped.", e); releaseLastEvent(false); } } catch (Exception e) { if (!isClosed.get()) { throw new PipeException( - "Error occurred during executing PipeConnector#transfer, perhaps need to check " - + "whether the implementation of PipeConnector is correct " - + "according to the pipe-api description.", + String.format( + "Exception in pipe transfer, subtask: %s, last event: %s, root cause: %s", + taskID, lastEvent, ErrorHandlingUtils.getRootCause(e).getMessage()), e); } else { - LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped."); + LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.", e); releaseLastEvent(false); } } @@ -316,8 +317,9 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask { outputPipeConnector.close(); } catch (Exception e) { LOGGER.info( - "Error occurred during closing PipeConnector, perhaps need to check whether the " - + "implementation of PipeConnector is correct according to the pipe-api description.", + "Exception occurred when closing pipe connector subtask {}, root cause: {}", + taskID, + ErrorHandlingUtils.getRootCause(e).getMessage(), e); } finally { inputPendingQueue.forEach( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java index 3c7de4099ab..149541aa82b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics; import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask; +import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -132,12 +133,12 @@ public class PipeProcessorSubtask extends PipeDataNodeSubtask { } catch (Exception e) { if (!isClosed.get()) { throw new PipeException( - "Error occurred during executing PipeProcessor#process, perhaps need to check " - + "whether the implementation of PipeProcessor is correct " - + "according to the pipe-api description.", + String.format( + "Exception in pipe process, subtask: %s, last event: %s, root cause: %s", + taskID, lastEvent, ErrorHandlingUtils.getRootCause(e).getMessage()), e); } else { - LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped."); + LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e); releaseLastEvent(false); } } @@ -163,8 +164,9 @@ public class PipeProcessorSubtask extends PipeDataNodeSubtask { pipeProcessor.close(); } catch (Exception e) { LOGGER.info( - "Error occurred during closing PipeProcessor, perhaps need to check whether the " - + "implementation of PipeProcessor is correct according to the pipe-api description.", + "Exception occurred when closing pipe processor subtask {}, root cause: {}", + taskID, + ErrorHandlingUtils.getRootCause(e).getMessage(), e); } finally { outputEventCollector.close();
