This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4de7e691116 Pipe: Enhance error handling logic in pipe async
connector's handlers (#12669)
4de7e691116 is described below
commit 4de7e691116084560bff27376b053c8d244e2988
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 5 18:39:46 2024 +0800
Pipe: Enhance error handling logic in pipe async connector's handlers
(#12669)
---
.../PipeTransferTabletBatchEventHandler.java | 28 ++++++++++++----------
.../PipeTransferTabletInsertionEventHandler.java | 22 +++++++++--------
.../PipeTransferTsFileInsertionEventHandler.java | 18 ++++++++------
3 files changed, 38 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 7407b1b9554..038da978153 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -121,18 +121,20 @@ public class PipeTransferTabletBatchEventHandler
implements AsyncMethodCallback<
@Override
public void onError(final Exception exception) {
- LOGGER.warn(
- "Failed to transfer TabletInsertionEvent batch {} (request commit
ids={}).",
- events.stream()
- .map(
- event ->
- event instanceof EnrichedEvent
- ? ((EnrichedEvent) event).coreReportMessage()
- : event.toString())
- .collect(Collectors.toList()),
- requestCommitIds,
- exception);
-
- connector.addFailureEventsToRetryQueue(events);
+ try {
+ LOGGER.warn(
+ "Failed to transfer TabletInsertionEvent batch {} (request commit
ids={}).",
+ events.stream()
+ .map(
+ event ->
+ event instanceof EnrichedEvent
+ ? ((EnrichedEvent) event).coreReportMessage()
+ : event.toString())
+ .collect(Collectors.toList()),
+ requestCommitIds,
+ exception);
+ } finally {
+ connector.addFailureEventsToRetryQueue(events);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 94fa6d5ac1a..2e54a8c7c0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -97,15 +97,17 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
@Override
public void onError(Exception exception) {
- LOGGER.warn(
- "Failed to transfer TabletInsertionEvent {} (committer key={}, commit
id={}).",
- event instanceof EnrichedEvent
- ? ((EnrichedEvent) event).coreReportMessage()
- : event.toString(),
- event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
- event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId()
: null,
- exception);
-
- connector.addFailureEventToRetryQueue(event);
+ try {
+ LOGGER.warn(
+ "Failed to transfer TabletInsertionEvent {} (committer key={},
commit id={}).",
+ event instanceof EnrichedEvent
+ ? ((EnrichedEvent) event).coreReportMessage()
+ : event.toString(),
+ event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
+ event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitId() : null,
+ exception);
+ } finally {
+ connector.addFailureEventToRetryQueue(event);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index e7e374443c6..a97d87ebdac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -241,18 +241,22 @@ public class PipeTransferTsFileInsertionEventHandler
@Override
public void onError(final Exception exception) {
- LOGGER.warn(
- "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit
id {}).",
- tsFile,
- event.getCommitterKey(),
- event.getCommitId(),
- exception);
+ try {
+ LOGGER.warn(
+ "Failed to transfer TsFileInsertionEvent {} (committer key {},
commit id {}).",
+ tsFile,
+ event.getCommitterKey(),
+ event.getCommitId(),
+ exception);
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to log error when failed to transfer file.", e);
+ }
try {
if (Objects.nonNull(clientManager)) {
clientManager.adjustTimeoutIfNecessary(exception);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
}