This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch air-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/air-13 by this push:
     new 7dbdac4099c Pipe: Enabled retry locally for air gap receiver & 
temporary unavailable exception (#17188)
7dbdac4099c is described below

commit 7dbdac4099cf32af2b8d5ea1e45c0d298572cae6
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 10 09:29:48 2026 +0800

    Pipe: Enabled retry locally for air gap receiver & temporary unavailable 
exception (#17188)
    
    * fix
    
    * some-part
    
    * Update IoTDBAirGapReceiver.java
    
    * fix
    
    * mipl
    
    * logger
---
 .../protocol/airgap/IoTDBAirGapReceiver.java       | 60 ++++++++++++++--------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 52 +++++++++++++++----
 .../iotdb/commons/pipe/config/PipeConfig.java      |  8 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 11 +++-
 4 files changed, 99 insertions(+), 32 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 610b9e5fe1a..0ff1834e8cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -133,27 +133,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
                   .setVersion(ReadWriteIOUtils.readByte(byteBuffer))
                   .setType(ReadWriteIOUtils.readShort(byteBuffer))
                   .setBody(byteBuffer.slice());
-      final TPipeTransferResp resp = agent.receive(req);
-
-      final TSStatus status = resp.getStatus();
-      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        ok();
-      } else if (status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
-          || status.getCode()
-              == 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
-        LOGGER.info(
-            "Pipe air gap receiver {}: TSStatus {} is encountered at the air 
gap receiver, will ignore.",
-            receiverId,
-            resp.getStatus());
-        ok();
-      } else {
-        LOGGER.warn(
-            "Pipe air gap receiver {}: Handle data failed, status: {}, req: 
{}",
-            receiverId,
-            resp.getStatus(),
-            req);
-        fail();
-      }
+      handleReq(req, System.currentTimeMillis());
     } catch (final PipeConnectionException e) {
       LOGGER.info(
           "Pipe air gap receiver {}: Socket {} closed when listening to data. 
Because: {}",
@@ -171,6 +151,44 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     }
   }
 
+  private void handleReq(final AirGapPseudoTPipeTransferRequest req, final 
long startTime)
+      throws IOException {
+    final TPipeTransferResp resp = agent.receive(req);
+
+    final TSStatus status = resp.getStatus();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      ok();
+    } else if (status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+        || status.getCode()
+            == 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+      LOGGER.info(
+          "Pipe air gap receiver {}: TSStatus {} is encountered at the air gap 
receiver, will ignore.",
+          receiverId,
+          resp.getStatus());
+      ok();
+    } else if (status.getCode()
+        == 
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+      try {
+        
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      LOGGER.info(
+          "Temporary unavailable exception encountered at air gap receiver, 
will retry locally.");
+      if (System.currentTimeMillis() - startTime
+          < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
+        handleReq(req, startTime);
+      }
+    } else {
+      LOGGER.warn(
+          "Pipe air gap receiver {}: Handle data failed, status: {}, req: {}",
+          receiverId,
+          resp.getStatus(),
+          req);
+      fail();
+    }
+  }
+
   private void ok() throws IOException {
     final OutputStream outputStream = socket.getOutputStream();
     outputStream.write(AirGapOneByteResponse.OK);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 62c389df758..ddaa5f8ab07 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
 import org.apache.iotdb.commons.enums.PipeRateAverage;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -275,7 +276,7 @@ public class CommonConfig {
   private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
   private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
   private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
-  private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
+  private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500;
   private int pipeAsyncConnectorSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
   private int pipeAsyncConnectorMaxClientNumber =
@@ -301,6 +302,9 @@ public class CommonConfig {
   private boolean pipeAirGapReceiverEnabled = false;
   private int pipeAirGapReceiverPort = 9780;
 
+  private long pipeAirGapRetryLocalIntervalMs = 1000L;
+  private long pipeAirGapRetryMaxMs = -1;
+
   private long pipeReceiverLoginPeriodicVerificationIntervalMs = -1;
   private double pipeReceiverActualToEstimatedMemoryRatio = 3;
 
@@ -1126,21 +1130,20 @@ public class CommonConfig {
     return pipeAsyncSinkForcedRetryTotalEventQueueSize;
   }
 
-  public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
-      long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
-    if (this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall
-        == pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
+  public void setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(
+      long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) {
+    if (this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall
+        == pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) {
       return;
     }
-    this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
-        pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+    this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 
pipeAsyncSinkMaxRetryExecutionTimeMsPerCall;
     logger.info(
-        "pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall is set to {}.",
-        pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
+        "pipeAsyncSinkMaxRetryExecutionTimeMsPerCall is set to {}.",
+        pipeAsyncSinkMaxRetryExecutionTimeMsPerCall);
   }
 
   public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
-    return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+    return pipeAsyncSinkMaxRetryExecutionTimeMsPerCall;
   }
 
   public int getPipeAsyncSinkSelectorNumber() {
@@ -1558,6 +1561,35 @@ public class CommonConfig {
     return pipeAirGapReceiverPort;
   }
 
+  public long getPipeAirGapRetryLocalIntervalMs() {
+    return pipeAirGapRetryLocalIntervalMs;
+  }
+
+  public void setPipeAirGapRetryLocalIntervalMs(long 
pipeAirGapRetryLocalIntervalMs) {
+    if (pipeAirGapRetryLocalIntervalMs == this.pipeAirGapRetryLocalIntervalMs) 
{
+      return;
+    }
+    this.pipeAirGapRetryLocalIntervalMs = pipeAirGapRetryLocalIntervalMs;
+    logger.info("pipeAirGapRetryLocalIntervalMs is set to {}.", 
pipeAirGapRetryLocalIntervalMs);
+  }
+
+  // < 0 : 0.8 * transfer timeout to avoid timeout
+  // = 0 : Disable retry
+  // > 0 : Explicit configuration
+  public long getPipeAirGapRetryMaxMs() {
+    return pipeAirGapRetryMaxMs >= 0
+        ? pipeAirGapRetryMaxMs
+        : (long) (PipeConfig.getInstance().getPipeSinkTransferTimeoutMs() * 
0.8);
+  }
+
+  public void setPipeAirGapRetryMaxMs(long pipeAirGapRetryMaxMs) {
+    if (pipeAirGapRetryMaxMs == this.pipeAirGapRetryMaxMs) {
+      return;
+    }
+    this.pipeAirGapRetryMaxMs = pipeAirGapRetryMaxMs;
+    logger.info("pipeAirGapRetryMaxMs is set to {}.", pipeAirGapRetryMaxMs);
+  }
+
   public void setPipeReceiverLoginPeriodicVerificationIntervalMs(
       long pipeReceiverLoginPeriodicVerificationIntervalMs) {
     if (this.pipeReceiverLoginPeriodicVerificationIntervalMs
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index fbf4cf191ac..88dfaf50948 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -335,6 +335,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAirGapReceiverPort();
   }
 
+  public long getPipeAirGapRetryLocalIntervalMs() {
+    return COMMON_CONFIG.getPipeAirGapRetryLocalIntervalMs();
+  }
+
+  public long getPipeAirGapRetryMaxMs() {
+    return COMMON_CONFIG.getPipeAirGapRetryMaxMs();
+  }
+
   /////////////////////////////// Receiver ///////////////////////////////
 
   public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 5322bfcd7ba..0722a016331 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -101,6 +101,15 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_air_gap_receiver_port",
                 Integer.toString(config.getPipeAirGapReceiverPort()))));
+    config.setPipeAirGapRetryLocalIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_air_gap_retry_local_interval_ms",
+                Long.toString(config.getPipeAirGapRetryLocalIntervalMs()))));
+    config.setPipeAirGapRetryMaxMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_air_gap_retry_max_ms", 
Long.toString(config.getPipeAirGapRetryMaxMs()))));
 
     config.setPipeMetaReportMaxLogNumPerRound(
         Double.parseDouble(
@@ -387,7 +396,7 @@ public class PipeDescriptor {
                     properties.getProperty(
                         "pipe_connector_rpc_thrift_compression_enabled",
                         
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
-    config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+    config.setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(
         Long.parseLong(
             Optional.ofNullable(
                     
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))

Reply via email to