This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-pipe-receiver-backpressure
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b20b48710310fc28101825a2d925e19058fdad1e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 12 15:20:18 2026 +0800

    Throttle async pipe sink on receiver reject
---
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 116 +++++++++++++++++++++
 .../handler/PipeTransferTrackableHandler.java      |  56 +++++++---
 .../handler/PipeTransferTrackableHandlerTest.java  |  38 ++++++-
 3 files changed, 193 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index b8b169b1f6a..a830c9994a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
@@ -63,6 +64,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import com.google.common.collect.ImmutableSet;
@@ -86,6 +88,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
@@ -131,6 +134,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       new ConcurrentHashMap<>();
 
   private final Set<CommitterKey> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
+  private final Map<String, ReceiverTemporaryUnavailableBackoff> 
receiverBackoffMap =
+      new ConcurrentHashMap<>();
 
   private boolean enableSendTsFileLimit;
   private volatile boolean isConnectionException;
@@ -756,6 +761,83 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     return enableSendTsFileLimit;
   }
 
+  public void waitIfReceiverTemporarilyUnavailable(final TEndPoint endPoint) {
+    final String endPointKey = format(endPoint);
+    if (Objects.isNull(endPointKey)) {
+      return;
+    }
+
+    final ReceiverTemporaryUnavailableBackoff backoff = 
receiverBackoffMap.get(endPointKey);
+    if (Objects.isNull(backoff)) {
+      return;
+    }
+
+    while (!isClosed.get()) {
+      final long waitTimeInMs = backoff.getRemainingWaitTimeInMs();
+      if (waitTimeInMs <= 0) {
+        return;
+      }
+
+      try {
+        Thread.sleep(waitTimeInMs);
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return;
+      }
+    }
+  }
+
+  public void recordReceiverStatus(final TEndPoint endPoint, final TSStatus 
status) {
+    final String endPointKey = format(endPoint);
+    if (Objects.isNull(endPointKey) || Objects.isNull(status)) {
+      return;
+    }
+
+    if (isReceiverTemporarilyUnavailable(status)) {
+      final long backoffTimeInMs =
+          receiverBackoffMap
+              .computeIfAbsent(endPointKey, key -> new 
ReceiverTemporaryUnavailableBackoff())
+              .markTemporarilyUnavailable();
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver {} is temporarily unavailable, throttle requests for {} 
ms. Status: {}",
+            endPointKey,
+            backoffTimeInMs,
+            status);
+      }
+    } else if (isSuccess(status)) {
+      final ReceiverTemporaryUnavailableBackoff backoff = 
receiverBackoffMap.get(endPointKey);
+      if (Objects.nonNull(backoff) && backoff.getRemainingWaitTimeInMs() <= 0) 
{
+        receiverBackoffMap.remove(endPointKey, backoff);
+      }
+    }
+  }
+
+  private static boolean isReceiverTemporarilyUnavailable(final TSStatus 
status) {
+    if (Objects.isNull(status)) {
+      return false;
+    }
+
+    final int statusCode = status.getCode();
+    if (statusCode == 
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()
+        || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+      return true;
+    }
+
+    return status.isSetSubStatus()
+        && status.getSubStatus().stream()
+            
.anyMatch(IoTDBDataRegionAsyncSink::isReceiverTemporarilyUnavailable);
+  }
+
+  private static boolean isSuccess(final TSStatus status) {
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode();
+  }
+
+  private static String format(final TEndPoint endPoint) {
+    return Objects.isNull(endPoint) ? null : endPoint.getIp() + ":" + 
endPoint.getPort();
+  }
+
   //////////////////////////// Operations for close 
////////////////////////////
 
   @Override
@@ -830,6 +912,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     // clear reference count of events in retry queue after closing async 
client
     clearRetryEventsReferenceCount();
     droppedPipeTaskKeys.clear();
+    receiverBackoffMap.clear();
 
     super.close();
   }
@@ -931,4 +1014,37 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
     }
   }
+
+  private static class ReceiverTemporaryUnavailableBackoff {
+
+    private final long maxBackoffTimeInMs =
+        Math.max(0, 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs());
+    private final AtomicLong currentBackoffTimeInMs =
+        new AtomicLong(
+            Math.min(
+                Math.max(0, 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()),
+                maxBackoffTimeInMs));
+    private final AtomicLong nextAvailableTimeInMs = new AtomicLong(0);
+
+    private long markTemporarilyUnavailable() {
+      final long backoffTimeInMs = currentBackoffTimeInMs.get();
+      nextAvailableTimeInMs.updateAndGet(
+          current -> Math.max(current, System.currentTimeMillis() + 
backoffTimeInMs));
+      currentBackoffTimeInMs.updateAndGet(this::getNextBackoffTimeInMs);
+      return backoffTimeInMs;
+    }
+
+    private long getRemainingWaitTimeInMs() {
+      return nextAvailableTimeInMs.get() - System.currentTimeMillis();
+    }
+
+    private long getNextBackoffTimeInMs(final long currentBackoffTimeInMs) {
+      if (currentBackoffTimeInMs <= 0 || currentBackoffTimeInMs >= 
maxBackoffTimeInMs) {
+        return maxBackoffTimeInMs;
+      }
+      return currentBackoffTimeInMs >= maxBackoffTimeInMs - 
currentBackoffTimeInMs
+          ? maxBackoffTimeInMs
+          : currentBackoffTimeInMs << 1;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index d543e736743..61d31eaa7cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -51,6 +51,10 @@ public abstract class PipeTransferTrackableHandler
 
   @Override
   public void onComplete(final TPipeTransferResp response) {
+    if (Objects.nonNull(client) && Objects.nonNull(response)) {
+      sink.recordReceiverStatus(client.getEndPoint(), response.getStatus());
+    }
+
     if (sink.isClosed()) {
       clearEventsReferenceCount();
       sink.eliminateHandler(this, true);
@@ -100,28 +104,40 @@ public abstract class PipeTransferTrackableHandler
     }
     // track handler before checking if connector is closed
     sink.trackHandler(this);
-    if (sink.isClosed()) {
-      clearEventsReferenceCount();
-      sink.eliminateHandler(this, true);
-      client.setShouldReturnSelf(true);
-      client.returnSelf(
-          (e) -> {
-            if (e instanceof IllegalStateException) {
-              PipeLogger.log(
-                  ignored ->
-                      
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
-                  "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
-              return true;
-            }
-            return false;
-          });
-      this.client = null;
+    if (returnFalseIfSinkIsClosed(client)) {
+      return false;
+    }
+    sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint());
+    if (returnFalseIfSinkIsClosed(client)) {
       return false;
     }
     doTransfer(client, req);
     return true;
   }
 
+  private boolean returnFalseIfSinkIsClosed(final 
AsyncPipeDataTransferServiceClient client) {
+    if (!sink.isClosed()) {
+      return false;
+    }
+
+    clearEventsReferenceCount();
+    sink.eliminateHandler(this, true);
+    client.setShouldReturnSelf(true);
+    client.returnSelf(
+        (e) -> {
+          if (e instanceof IllegalStateException) {
+            PipeLogger.log(
+                ignored ->
+                    
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
+                "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
+            return true;
+          }
+          return false;
+        });
+    this.client = null;
+    return true;
+  }
+
   /**
    * @return {@code true} if all transmissions corresponding to the handler 
have been completed,
    *     {@code false} otherwise
@@ -190,6 +206,10 @@ public abstract class PipeTransferTrackableHandler
               return;
             }
 
+            if (Objects.nonNull(response)) {
+              sink.recordReceiverStatus(client.getEndPoint(), 
response.getStatus());
+            }
+
             if (response == null) {
               fallbackToWholeRequest(
                   client,
@@ -255,6 +275,10 @@ public abstract class PipeTransferTrackableHandler
 
     try {
       client.setShouldReturnSelf(shouldReturnSelf);
+      sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint());
+      if (returnFalseIfSinkIsClosed(client)) {
+        return;
+      }
       client.pipeTransfer(originalReq, this);
     } catch (final Exception e) {
       PipeTransferTrackableHandler.this.onError(e);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
index 60b69235085..8e0f7802998 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.conf.CommonConfig;
@@ -38,6 +39,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
@@ -154,6 +156,36 @@ public class PipeTransferTrackableHandlerTest {
     Assert.assertEquals(0, handler.errorCount);
   }
 
+  @Test
+  public void testTransferWaitsForReceiverBackoffAndRecordsStatus() throws 
Exception {
+    final IoTDBDataRegionAsyncSink sink = 
Mockito.mock(IoTDBDataRegionAsyncSink.class);
+    final AsyncPipeDataTransferServiceClient client =
+        Mockito.mock(AsyncPipeDataTransferServiceClient.class);
+    final TEndPoint endPoint = new TEndPoint("127.0.0.1", 6667);
+    final TSStatus status =
+        new TSStatus()
+            
.setCode(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode());
+
+    Mockito.when(client.getEndPoint()).thenReturn(endPoint);
+    Mockito.doAnswer(
+            invocation -> {
+              final AsyncMethodCallback<TPipeTransferResp> callback = 
invocation.getArgument(1);
+              callback.onComplete(resp(status));
+              return null;
+            })
+        .when(client)
+        .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());
+
+    final TestPipeTransferTrackableHandler handler = new 
TestPipeTransferTrackableHandler(sink);
+
+    handler.transfer(client, createReq(1));
+
+    final InOrder inOrder = Mockito.inOrder(sink, client);
+    inOrder.verify(sink).waitIfReceiverTemporarilyUnavailable(endPoint);
+    inOrder.verify(client).pipeTransfer(Mockito.any(TPipeTransferReq.class), 
Mockito.any());
+    Mockito.verify(sink).recordReceiverStatus(endPoint, status);
+  }
+
   private static TPipeTransferReq createReq(final int bodySize) {
     final byte[] body = new byte[bodySize];
     for (int i = 0; i < body.length; ++i) {
@@ -168,8 +200,12 @@ public class PipeTransferTrackableHandlerTest {
   }
 
   private static TPipeTransferResp successResp() {
+    return resp(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+
+  private static TPipeTransferResp resp(final TSStatus status) {
     final TPipeTransferResp resp = new TPipeTransferResp();
-    resp.setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    resp.setStatus(status);
     return resp;
   }
 

Reply via email to