This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/connection-retry by this push:
new 742db047741 fix
742db047741 is described below
commit 742db0477418d3bea5a0db5451290c171752a268
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 10:03:26 2026 +0800
fix
---
.../thrift/async/IoTDBDataRegionAsyncSink.java | 35 ++++++++++++++--------
.../PipeTransferTabletBatchEventHandler.java | 2 +-
.../PipeTransferTabletInsertionEventHandler.java | 2 +-
.../async/handler/PipeTransferTsFileHandler.java | 2 +-
4 files changed, 25 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 7b7ce628f82..2e1d630eeb1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -59,6 +60,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -262,10 +264,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
false,
sealedFile.left));
}
- } catch (final Throwable t) {
- LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, t);
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, e);
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- addFailureEventsToRetryQueue(events);
+ addFailureEventsToRetryQueue(events, e);
}
}
} else {
@@ -611,14 +613,17 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
}
if (remainingEvents <= retryEventQueue.size() +
retryTsFileQueue.size()) {
- throw new PipeException(
+ final String message =
"Failed to retry transferring events in the retry queue.
Remaining events: "
+ (retryEventQueue.size() + retryTsFileQueue.size())
+ " (tablet events: "
+ retryEventQueueEventCounter.getTabletInsertionEventCount()
+ ", tsfile events: "
+ retryEventQueueEventCounter.getTsFileInsertionEventCount()
- + ").");
+ + ").";
+ throw isConnectionException
+ ? new PipeConnectionException(message)
+ : new PipeException(message);
}
}
}
@@ -634,7 +639,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
}
} catch (final Exception e) {
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, e);
}
return;
}
@@ -647,14 +652,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
}
} else {
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, null);
}
} catch (final Exception e) {
if (tabletInsertionEvent instanceof EnrichedEvent) {
((EnrichedEvent) tabletInsertionEvent)
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
false);
}
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, e);
}
}
@@ -664,11 +669,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
tsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionAsyncSink.class.getName(), false);
} else {
- addFailureEventToRetryQueue(tsFileInsertionEvent);
+ addFailureEventToRetryQueue(tsFileInsertionEvent, null);
}
} catch (final Exception e) {
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
false);
- addFailureEventToRetryQueue(tsFileInsertionEvent);
+ addFailureEventToRetryQueue(tsFileInsertionEvent, e);
}
}
@@ -678,7 +683,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
* @param event {@link Event} to retry
*/
@SuppressWarnings("java:S899")
- public void addFailureEventToRetryQueue(final Event event) {
+ public void addFailureEventToRetryQueue(final Event event, final Exception
e) {
+ if (e instanceof PipeConnectionException ||
ThriftClient.isConnectionBroken(e)) {
+ isConnectionException = true;
+ }
if (event instanceof EnrichedEvent && ((EnrichedEvent)
event).isReleased()) {
return;
}
@@ -714,8 +722,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
*
* @param events {@link EnrichedEvent}s to retry
*/
- public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent>
events) {
- events.forEach(this::addFailureEventToRetryQueue);
+ public void addFailureEventsToRetryQueue(
+ final Iterable<EnrichedEvent> events, final Exception e) {
+ events.forEach(event -> addFailureEventToRetryQueue(event, e));
}
public boolean isEnableSendTsFileLimit() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 89baaa02794..d95513a2228 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
events.size(),
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
} finally {
- connector.addFailureEventsToRetryQueue(events);
+ connector.addFailureEventsToRetryQueue(events, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index df26325a5ca..ca92af92b19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -98,7 +98,7 @@ public abstract class PipeTransferTabletInsertionEventHandler
extends PipeTransf
event.getCommitterKey(),
event.getCommitId());
} finally {
- connector.addFailureEventToRetryQueue(event);
+ connector.addFailureEventToRetryQueue(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 67fa5c9bd8c..6726c9a6701 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -415,7 +415,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
returnClientIfNecessary();
} finally {
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- connector.addFailureEventsToRetryQueue(events);
+ connector.addFailureEventsToRetryQueue(events, exception);
}
}
}