This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch air-gap-local
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/air-gap-local by this push:
     new 3122f197b65 mipl
3122f197b65 is described below

commit 3122f197b652bf16fe5093a9d17c261294d4de23
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 17:13:41 2026 +0800

    mipl
---
 .../protocol/airgap/IoTDBAirGapReceiver.java       | 30 +---------------------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  8 +++++-
 2 files changed, 8 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 49e642371c6..5d2e7013b34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -133,35 +133,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
                   .setVersion(ReadWriteIOUtils.readByte(byteBuffer))
                   .setType(ReadWriteIOUtils.readShort(byteBuffer))
                   .setBody(byteBuffer.slice());
-      final TPipeTransferResp resp = agent.receive(req);
-
-      final TSStatus status = resp.getStatus();
-      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        ok();
-      } else if (status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
-          || status.getCode()
-              == 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
-        LOGGER.info(
-            "Pipe air gap receiver {}: TSStatus {} is encountered at the air 
gap receiver, will ignore.",
-            receiverId,
-            resp.getStatus());
-        ok();
-      } else if (status.getCode()
-          == 
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
-        final long startTime = System.currentTimeMillis();
-        while (System.currentTimeMillis() - startTime
-            < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
-          agent.receive(req);
-          
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
-        }
-      } else {
-        LOGGER.warn(
-            "Pipe air gap receiver {}: Handle data failed, status: {}, req: 
{}",
-            receiverId,
-            resp.getStatus(),
-            req);
-        fail();
-      }
+      handleReq(req, System.currentTimeMillis());
     } catch (final PipeConnectionException e) {
       LOGGER.info(
           "Pipe air gap receiver {}: Socket {} closed when listening to data. 
Because: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b5cf9cd1a3c..cf68da89553 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
 import org.apache.iotdb.commons.enums.PipeRateAverage;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -1630,8 +1631,13 @@ public class CommonConfig {
     logger.info("pipeAirGapRetryLocalIntervalMs is set to {}.", 
pipeAirGapRetryLocalIntervalMs);
   }
 
+  // < 0 : 0.8 * transfer timeout to avoid timeout
+  // = 0 : Disable retry
+  // > 0 : Explicit configuration
   public long getPipeAirGapRetryMaxMs() {
-    return pipeAirGapRetryMaxMs;
+    return pipeAirGapRetryMaxMs >= 0
+        ? pipeAirGapRetryMaxMs
+        : (long) (PipeConfig.getInstance().getPipeSinkTransferTimeoutMs() * 
0.8);
   }
 
   public void setPipeAirGapRetryMaxMs(long pipeAirGapRetryMaxMs) {

Reply via email to