This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch print-pipe-error
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 14453dd5d701cc8671344cb04921ea93efb12eb6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jan 16 17:42:07 2024 +0800

    Pipe: print more info about event and task when error occurs in pipe subtask
---
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  | 41 +++++++++++++++++++---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 13 +++----
 .../common/tablet/PipeRawTabletInsertionEvent.java | 11 ++++++
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 19 +++++-----
 .../subtask/connector/PipeConnectorSubtask.java    | 16 +++++----
 .../subtask/processor/PipeProcessorSubtask.java    | 14 ++++----
 6 files changed, 80 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 1f7b794327b..a2a98f78ad0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -41,16 +41,16 @@ public abstract class EnrichedEvent implements Event {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(EnrichedEvent.class);
 
-  private final AtomicInteger referenceCount;
+  protected final AtomicInteger referenceCount;
 
   protected final String pipeName;
   protected final PipeTaskMeta pipeTaskMeta;
 
-  private String committerKey;
+  protected String committerKey;
   public static final long NO_COMMIT_ID = -1;
-  private long commitId = NO_COMMIT_ID;
+  protected long commitId = NO_COMMIT_ID;
 
-  private final String pattern;
+  protected final String pattern;
 
   protected final long startTime;
   protected final long endTime;
@@ -58,7 +58,7 @@ public abstract class EnrichedEvent implements Event {
   protected boolean isPatternParsed;
   protected boolean isTimeParsed;
 
-  private boolean shouldReportOnCommit = false;
+  protected boolean shouldReportOnCommit = false;
 
   protected EnrichedEvent(
       String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
@@ -245,4 +245,35 @@ public abstract class EnrichedEvent implements Event {
       reportProgress();
     }
   }
+
+  @Override
+  public String toString() {
+    return "EnrichedEvent{"
+        + "referenceCount="
+        + referenceCount.get()
+        + ", pipeName='"
+        + pipeName
+        + '\''
+        + ", pipeTaskMeta="
+        + pipeTaskMeta
+        + ", committerKey='"
+        + committerKey
+        + '\''
+        + ", commitId="
+        + commitId
+        + ", pattern='"
+        + pattern
+        + '\''
+        + ", startTime="
+        + startTime
+        + ", endTime="
+        + endTime
+        + ", isPatternParsed="
+        + isPatternParsed
+        + ", isTimeParsed="
+        + isTimeParsed
+        + ", shouldReportOnCommit="
+        + shouldReportOnCommit
+        + '}';
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 68d8049a81f..98ab1c7b2ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -249,13 +249,10 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public String toString() {
-    return "PipeInsertNodeTabletInsertionEvent{"
-        + "walEntryHandler="
-        + walEntryHandler
-        + ", progressIndex="
-        + progressIndex
-        + ", isAligned="
-        + isAligned
-        + '}';
+    return String.format(
+            "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s, 
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainer=%s}",
+            walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, 
dataContainer)
+        + " - "
+        + super.toString();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 4e30a6b1f68..03df841181c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -208,4 +208,15 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   public boolean hasNoNeedParsingAndIsEmpty() {
     return !shouldParsePatternOrTime() && tablet.rowSize == 0;
   }
+
+  /////////////////////////// Object ///////////////////////////
+
+  @Override
+  public String toString() {
+    return String.format(
+            "PipeRawTabletInsertionEvent{tablet=%s, isAligned=%s, 
sourceEvent=%s, needToReport=%s, allocatedMemoryBlock=%s, dataContainer=%s}",
+            tablet, isAligned, sourceEvent, needToReport, 
allocatedMemoryBlock, dataContainer)
+        + " - "
+        + super.toString();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 473c922f15f..ee2f442e104 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -222,13 +222,16 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
 
   @Override
   public String toString() {
-    return "PipeTsFileInsertionEvent{"
-        + "resource="
-        + resource
-        + ", tsFile="
-        + tsFile
-        + ", isClosed="
-        + isClosed
-        + '}';
+    return String.format(
+            "PipeTsFileInsertionEvent{isTsFileFormatValid=%s, resource=%s, 
tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s, dataContainer=%s}",
+            isTsFileFormatValid,
+            resource,
+            tsFile,
+            isLoaded,
+            isGeneratedByPipe,
+            isClosed.get(),
+            dataContainer)
+        + " - "
+        + super.toString();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 2b8f1020a2c..184c706be1c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -150,18 +150,19 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
       if (!isClosed.get()) {
         throw e;
       } else {
-        LOGGER.info("PipeConnectionException in pipe transfer, ignored because 
pipe is dropped.");
+        LOGGER.info(
+            "PipeConnectionException in pipe transfer, ignored because pipe is 
dropped.", e);
         releaseLastEvent(false);
       }
     } catch (Exception e) {
       if (!isClosed.get()) {
         throw new PipeException(
-            "Error occurred during executing PipeConnector#transfer, perhaps 
need to check "
-                + "whether the implementation of PipeConnector is correct "
-                + "according to the pipe-api description.",
+            String.format(
+                "Exception in pipe transfer, subtask: %s, last event: %s, root 
cause: %s",
+                taskID, lastEvent, 
ErrorHandlingUtils.getRootCause(e).getMessage()),
             e);
       } else {
-        LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.");
+        LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.", e);
         releaseLastEvent(false);
       }
     }
@@ -316,8 +317,9 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
       outputPipeConnector.close();
     } catch (Exception e) {
       LOGGER.info(
-          "Error occurred during closing PipeConnector, perhaps need to check 
whether the "
-              + "implementation of PipeConnector is correct according to the 
pipe-api description.",
+          "Exception occurred when closing pipe connector subtask {}, root 
cause: {}",
+          taskID,
+          ErrorHandlingUtils.getRootCause(e).getMessage(),
           e);
     } finally {
       inputPendingQueue.forEach(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 3c7de4099ab..149541aa82b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask;
+import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -132,12 +133,12 @@ public class PipeProcessorSubtask extends 
PipeDataNodeSubtask {
     } catch (Exception e) {
       if (!isClosed.get()) {
         throw new PipeException(
-            "Error occurred during executing PipeProcessor#process, perhaps 
need to check "
-                + "whether the implementation of PipeProcessor is correct "
-                + "according to the pipe-api description.",
+            String.format(
+                "Exception in pipe process, subtask: %s, last event: %s, root 
cause: %s",
+                taskID, lastEvent, 
ErrorHandlingUtils.getRootCause(e).getMessage()),
             e);
       } else {
-        LOGGER.info("Exception in pipe event processing, ignored because pipe 
is dropped.");
+        LOGGER.info("Exception in pipe event processing, ignored because pipe 
is dropped.", e);
         releaseLastEvent(false);
       }
     }
@@ -163,8 +164,9 @@ public class PipeProcessorSubtask extends 
PipeDataNodeSubtask {
       pipeProcessor.close();
     } catch (Exception e) {
       LOGGER.info(
-          "Error occurred during closing PipeProcessor, perhaps need to check 
whether the "
-              + "implementation of PipeProcessor is correct according to the 
pipe-api description.",
+          "Exception occurred when closing pipe processor subtask {}, root 
cause: {}",
+          taskID,
+          ErrorHandlingUtils.getRootCause(e).getMessage(),
           e);
     } finally {
       outputEventCollector.close();

Reply via email to