This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch air-gap-local
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/air-gap-local by this push:
     new d3beee534aa some-part
d3beee534aa is described below

commit d3beee534aa597f5c5916dffb10cd604a77842d0
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:17:20 2026 +0800

    some-part
---
 .../protocol/airgap/IoTDBAirGapReceiver.java       | 40 ++++++++++++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  8 +++++
 2 files changed, 48 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..0a10db6f433 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -146,6 +146,14 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
             receiverId,
             resp.getStatus());
         ok();
+      } else if (status.getCode()
+          == 
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+        final long startTime = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTime
+            < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
+          agent.receive(req);
+          
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
+        }
       } else {
         LOGGER.warn(
             "Pipe air gap receiver {}: Handle data failed, status: {}, req: 
{}",
@@ -171,6 +179,38 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
     }
   }
 
+  private void handleReq(final AirGapPseudoTPipeTransferRequest req) throws 
IOException {
+    final TPipeTransferResp resp = agent.receive(req);
+
+    final TSStatus status = resp.getStatus();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      ok();
+    } else if (status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+        || status.getCode()
+            == 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+      LOGGER.info(
+          "Pipe air gap receiver {}: TSStatus {} is encountered at the air gap 
receiver, will ignore.",
+          receiverId,
+          resp.getStatus());
+      ok();
+    } else if (status.getCode()
+        == 
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+      try {
+        
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      handleReq(req);
+    } else {
+      LOGGER.warn(
+          "Pipe air gap receiver {}: Handle data failed, status: {}, req: {}",
+          receiverId,
+          resp.getStatus(),
+          req);
+      fail();
+    }
+  }
+
   private void ok() throws IOException {
     final OutputStream outputStream = socket.getOutputStream();
     outputStream.write(AirGapOneByteResponse.OK);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1ed1e39911f..12882cb83dd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -323,6 +323,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAirGapReceiverPort();
   }
 
+  public long getPipeAirGapRetryLocalIntervalMs() {
+    return COMMON_CONFIG.getPipeAirGapRetryLocalIntervalMs();
+  }
+
+  public long getPipeAirGapRetryMaxMs() {
+    return COMMON_CONFIG.getPipeAirGapRetryMaxMs();
+  }
+
   /////////////////////////////// Receiver ///////////////////////////////
 
   public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {

Reply via email to