This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-selector-enhance in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9fe0293bccfbe4ac0d52abcc12b29722db501825 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jun 5 18:28:40 2024 +0800 Pipe: Enhance error handling logic in pipe async connector's handlers --- .../PipeTransferTabletBatchEventHandler.java | 28 ++++++++++++---------- .../PipeTransferTabletInsertionEventHandler.java | 22 +++++++++-------- .../PipeTransferTsFileInsertionEventHandler.java | 18 ++++++++------ 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java index 7407b1b9554..038da978153 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java @@ -121,18 +121,20 @@ public class PipeTransferTabletBatchEventHandler implements AsyncMethodCallback< @Override public void onError(final Exception exception) { - LOGGER.warn( - "Failed to transfer TabletInsertionEvent batch {} (request commit ids={}).", - events.stream() - .map( - event -> - event instanceof EnrichedEvent - ? ((EnrichedEvent) event).coreReportMessage() - : event.toString()) - .collect(Collectors.toList()), - requestCommitIds, - exception); - - connector.addFailureEventsToRetryQueue(events); + try { + LOGGER.warn( + "Failed to transfer TabletInsertionEvent batch {} (request commit ids={}).", + events.stream() + .map( + event -> + event instanceof EnrichedEvent + ? ((EnrichedEvent) event).coreReportMessage() + : event.toString()) + .collect(Collectors.toList()), + requestCommitIds, + exception); + } finally { + connector.addFailureEventsToRetryQueue(events); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java index 94fa6d5ac1a..2e54a8c7c0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java @@ -97,15 +97,17 @@ public abstract class PipeTransferTabletInsertionEventHandler<E extends TPipeTra @Override public void onError(Exception exception) { - LOGGER.warn( - "Failed to transfer TabletInsertionEvent {} (committer key={}, commit id={}).", - event instanceof EnrichedEvent - ? ((EnrichedEvent) event).coreReportMessage() - : event.toString(), - event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitterKey() : null, - event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() : null, - exception); - - connector.addFailureEventToRetryQueue(event); + try { + LOGGER.warn( + "Failed to transfer TabletInsertionEvent {} (committer key={}, commit id={}).", + event instanceof EnrichedEvent + ? ((EnrichedEvent) event).coreReportMessage() + : event.toString(), + event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitterKey() : null, + event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() : null, + exception); + } finally { + connector.addFailureEventToRetryQueue(event); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java index e7e374443c6..a97d87ebdac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java @@ -241,18 +241,22 @@ public class PipeTransferTsFileInsertionEventHandler @Override public void onError(final Exception exception) { - LOGGER.warn( - "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit id {}).", - tsFile, - event.getCommitterKey(), - event.getCommitId(), - exception); + try { + LOGGER.warn( + "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit id {}).", + tsFile, + event.getCommitterKey(), + event.getCommitId(), + exception); + } catch (final Exception e) { + LOGGER.warn("Failed to log error when failed to transfer file.", e); + } try { if (Objects.nonNull(clientManager)) { clientManager.adjustTimeoutIfNecessary(exception); } - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e); }
