This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0746d6710c2 Pipe: print more info about event and task when error
occurs in pipe subtask (#11909)
0746d6710c2 is described below
commit 0746d6710c24998dac890c06a118ebff719d6599
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jan 16 21:00:26 2024 +0800
Pipe: print more info about event and task when error occurs in pipe
subtask (#11909)
---
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 41 +++++++++++++++++++---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 13 +++----
.../common/tablet/PipeRawTabletInsertionEvent.java | 11 ++++++
.../common/tsfile/PipeTsFileInsertionEvent.java | 19 +++++-----
.../subtask/connector/PipeConnectorSubtask.java | 16 +++++----
.../subtask/processor/PipeProcessorSubtask.java | 14 ++++----
6 files changed, 80 insertions(+), 34 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 1f7b794327b..a2a98f78ad0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -41,16 +41,16 @@ public abstract class EnrichedEvent implements Event {
private static final Logger LOGGER =
LoggerFactory.getLogger(EnrichedEvent.class);
- private final AtomicInteger referenceCount;
+ protected final AtomicInteger referenceCount;
protected final String pipeName;
protected final PipeTaskMeta pipeTaskMeta;
- private String committerKey;
+ protected String committerKey;
public static final long NO_COMMIT_ID = -1;
- private long commitId = NO_COMMIT_ID;
+ protected long commitId = NO_COMMIT_ID;
- private final String pattern;
+ protected final String pattern;
protected final long startTime;
protected final long endTime;
@@ -58,7 +58,7 @@ public abstract class EnrichedEvent implements Event {
protected boolean isPatternParsed;
protected boolean isTimeParsed;
- private boolean shouldReportOnCommit = false;
+ protected boolean shouldReportOnCommit = false;
protected EnrichedEvent(
String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
@@ -245,4 +245,35 @@ public abstract class EnrichedEvent implements Event {
reportProgress();
}
}
+
+ @Override
+ public String toString() {
+ return "EnrichedEvent{"
+ + "referenceCount="
+ + referenceCount.get()
+ + ", pipeName='"
+ + pipeName
+ + '\''
+ + ", pipeTaskMeta="
+ + pipeTaskMeta
+ + ", committerKey='"
+ + committerKey
+ + '\''
+ + ", commitId="
+ + commitId
+ + ", pattern='"
+ + pattern
+ + '\''
+ + ", startTime="
+ + startTime
+ + ", endTime="
+ + endTime
+ + ", isPatternParsed="
+ + isPatternParsed
+ + ", isTimeParsed="
+ + isTimeParsed
+ + ", shouldReportOnCommit="
+ + shouldReportOnCommit
+ + '}';
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 68d8049a81f..98ab1c7b2ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -249,13 +249,10 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public String toString() {
- return "PipeInsertNodeTabletInsertionEvent{"
- + "walEntryHandler="
- + walEntryHandler
- + ", progressIndex="
- + progressIndex
- + ", isAligned="
- + isAligned
- + '}';
+ return String.format(
+ "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s,
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainer=%s}",
+ walEntryHandler, progressIndex, isAligned, isGeneratedByPipe,
dataContainer)
+ + " - "
+ + super.toString();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 4e30a6b1f68..03df841181c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -208,4 +208,15 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
public boolean hasNoNeedParsingAndIsEmpty() {
return !shouldParsePatternOrTime() && tablet.rowSize == 0;
}
+
+ /////////////////////////// Object ///////////////////////////
+
+ @Override
+ public String toString() {
+ return String.format(
+ "PipeRawTabletInsertionEvent{tablet=%s, isAligned=%s,
sourceEvent=%s, needToReport=%s, allocatedMemoryBlock=%s, dataContainer=%s}",
+ tablet, isAligned, sourceEvent, needToReport,
allocatedMemoryBlock, dataContainer)
+ + " - "
+ + super.toString();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 473c922f15f..ee2f442e104 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -222,13 +222,16 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public String toString() {
- return "PipeTsFileInsertionEvent{"
- + "resource="
- + resource
- + ", tsFile="
- + tsFile
- + ", isClosed="
- + isClosed
- + '}';
+ return String.format(
+ "PipeTsFileInsertionEvent{isTsFileFormatValid=%s, resource=%s,
tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s, dataContainer=%s}",
+ isTsFileFormatValid,
+ resource,
+ tsFile,
+ isLoaded,
+ isGeneratedByPipe,
+ isClosed.get(),
+ dataContainer)
+ + " - "
+ + super.toString();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 2b8f1020a2c..184c706be1c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -150,18 +150,19 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
if (!isClosed.get()) {
throw e;
} else {
- LOGGER.info("PipeConnectionException in pipe transfer, ignored because
pipe is dropped.");
+ LOGGER.info(
+ "PipeConnectionException in pipe transfer, ignored because pipe is
dropped.", e);
releaseLastEvent(false);
}
} catch (Exception e) {
if (!isClosed.get()) {
throw new PipeException(
- "Error occurred during executing PipeConnector#transfer, perhaps
need to check "
- + "whether the implementation of PipeConnector is correct "
- + "according to the pipe-api description.",
+ String.format(
+ "Exception in pipe transfer, subtask: %s, last event: %s, root
cause: %s",
+ taskID, lastEvent,
ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
- LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.");
+ LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
releaseLastEvent(false);
}
}
@@ -316,8 +317,9 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
outputPipeConnector.close();
} catch (Exception e) {
LOGGER.info(
- "Error occurred during closing PipeConnector, perhaps need to check
whether the "
- + "implementation of PipeConnector is correct according to the
pipe-api description.",
+ "Exception occurred when closing pipe connector subtask {}, root
cause: {}",
+ taskID,
+ ErrorHandlingUtils.getRootCause(e).getMessage(),
e);
} finally {
inputPendingQueue.forEach(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 3c7de4099ab..149541aa82b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask;
+import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -132,12 +133,12 @@ public class PipeProcessorSubtask extends
PipeDataNodeSubtask {
} catch (Exception e) {
if (!isClosed.get()) {
throw new PipeException(
- "Error occurred during executing PipeProcessor#process, perhaps
need to check "
- + "whether the implementation of PipeProcessor is correct "
- + "according to the pipe-api description.",
+ String.format(
+ "Exception in pipe process, subtask: %s, last event: %s, root
cause: %s",
+ taskID, lastEvent,
ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
- LOGGER.info("Exception in pipe event processing, ignored because pipe
is dropped.");
+ LOGGER.info("Exception in pipe event processing, ignored because pipe
is dropped.", e);
releaseLastEvent(false);
}
}
@@ -163,8 +164,9 @@ public class PipeProcessorSubtask extends
PipeDataNodeSubtask {
pipeProcessor.close();
} catch (Exception e) {
LOGGER.info(
- "Error occurred during closing PipeProcessor, perhaps need to check
whether the "
- + "implementation of PipeProcessor is correct according to the
pipe-api description.",
+ "Exception occurred when closing pipe processor subtask {}, root
cause: {}",
+ taskID,
+ ErrorHandlingUtils.getRootCause(e).getMessage(),
e);
} finally {
outputEventCollector.close();