This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de7e691116 Pipe: Enhance error handling logic in pipe async 
connector's handlers (#12669)
4de7e691116 is described below

commit 4de7e691116084560bff27376b053c8d244e2988
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 5 18:39:46 2024 +0800

    Pipe: Enhance error handling logic in pipe async connector's handlers 
(#12669)
---
 .../PipeTransferTabletBatchEventHandler.java       | 28 ++++++++++++----------
 .../PipeTransferTabletInsertionEventHandler.java   | 22 +++++++++--------
 .../PipeTransferTsFileInsertionEventHandler.java   | 18 ++++++++------
 3 files changed, 38 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 7407b1b9554..038da978153 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -121,18 +121,20 @@ public class PipeTransferTabletBatchEventHandler 
implements AsyncMethodCallback<
 
   @Override
   public void onError(final Exception exception) {
-    LOGGER.warn(
-        "Failed to transfer TabletInsertionEvent batch {} (request commit 
ids={}).",
-        events.stream()
-            .map(
-                event ->
-                    event instanceof EnrichedEvent
-                        ? ((EnrichedEvent) event).coreReportMessage()
-                        : event.toString())
-            .collect(Collectors.toList()),
-        requestCommitIds,
-        exception);
-
-    connector.addFailureEventsToRetryQueue(events);
+    try {
+      LOGGER.warn(
+          "Failed to transfer TabletInsertionEvent batch {} (request commit 
ids={}).",
+          events.stream()
+              .map(
+                  event ->
+                      event instanceof EnrichedEvent
+                          ? ((EnrichedEvent) event).coreReportMessage()
+                          : event.toString())
+              .collect(Collectors.toList()),
+          requestCommitIds,
+          exception);
+    } finally {
+      connector.addFailureEventsToRetryQueue(events);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 94fa6d5ac1a..2e54a8c7c0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -97,15 +97,17 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
 
   @Override
   public void onError(Exception exception) {
-    LOGGER.warn(
-        "Failed to transfer TabletInsertionEvent {} (committer key={}, commit 
id={}).",
-        event instanceof EnrichedEvent
-            ? ((EnrichedEvent) event).coreReportMessage()
-            : event.toString(),
-        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
-        event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() 
: null,
-        exception);
-
-    connector.addFailureEventToRetryQueue(event);
+    try {
+      LOGGER.warn(
+          "Failed to transfer TabletInsertionEvent {} (committer key={}, 
commit id={}).",
+          event instanceof EnrichedEvent
+              ? ((EnrichedEvent) event).coreReportMessage()
+              : event.toString(),
+          event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
+          event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitId() : null,
+          exception);
+    } finally {
+      connector.addFailureEventToRetryQueue(event);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index e7e374443c6..a97d87ebdac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -241,18 +241,22 @@ public class PipeTransferTsFileInsertionEventHandler
 
   @Override
   public void onError(final Exception exception) {
-    LOGGER.warn(
-        "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit 
id {}).",
-        tsFile,
-        event.getCommitterKey(),
-        event.getCommitId(),
-        exception);
+    try {
+      LOGGER.warn(
+          "Failed to transfer TsFileInsertionEvent {} (committer key {}, 
commit id {}).",
+          tsFile,
+          event.getCommitterKey(),
+          event.getCommitId(),
+          exception);
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to log error when failed to transfer file.", e);
+    }
 
     try {
       if (Objects.nonNull(clientManager)) {
         clientManager.adjustTimeoutIfNecessary(exception);
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
     }
 

Reply via email to