This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ec3a5144b1 Pipe: Configured the air gap timeout to avoid packet loss 
(#17231)
6ec3a5144b1 is described below

commit 6ec3a5144b16eee6897f1baab48642c47e4e80c3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 4 14:17:09 2026 +0800

    Pipe: Configured the air gap timeout to avoid packet loss (#17231)
    
    * good-game
    
    * ifx
    
    * fxi
    
    * xif
---
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java  |  5 +++++
 .../org/apache/iotdb/commons/conf/CommonConfig.java | 21 +++++++++++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java       |  4 ++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java   |  7 +++++++
 4 files changed, 37 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 5e2dfb6f9a0..5bee20c4dc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -99,6 +99,9 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
+      // When receiver encountered packet loss, the transfer will time out
+      // We need to restore the transfer quickly by retry under this 
circumstance
+      socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
       } else {
@@ -112,6 +115,8 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
               "Network error when transfer tablet insertion event %s, because 
%s.",
               ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), 
e.getMessage()),
           e);
+    } finally {
+      socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cf68da89553..a490107ded3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -270,6 +270,7 @@ public class CommonConfig {
   private long pipeSourceMatcherCacheSize = 1024;
 
   private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+  private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
   private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeSinkReadFileBufferSize = 5242880; // 5MB
   private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
@@ -1077,6 +1078,26 @@ public class CommonConfig {
     }
   }
 
+  public int getPipeAirGapSinkTabletTimeoutMs() {
+    return pipeAirGapSinkTabletTimeoutMs;
+  }
+
+  public void setPipeAirGapSinkTabletTimeoutMs(long 
pipeAirGapSinkTabletTimeoutMs) {
+    final int fPipeAirGapSinkTabletTimeoutMs = 
this.pipeAirGapSinkTabletTimeoutMs;
+    try {
+      this.pipeAirGapSinkTabletTimeoutMs = 
Math.toIntExact(pipeAirGapSinkTabletTimeoutMs);
+    } catch (ArithmeticException e) {
+      this.pipeAirGapSinkTabletTimeoutMs = Integer.MAX_VALUE;
+      logger.warn(
+          "Given pipe air gap sink tablet timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
+    } finally {
+      if (fPipeAirGapSinkTabletTimeoutMs != 
this.pipeAirGapSinkTabletTimeoutMs) {
+        logger.info(
+            "pipeAirGapSinkTabletTimeoutMs is set to {}.", 
this.pipeAirGapSinkTabletTimeoutMs);
+      }
+    }
+  }
+
   public int getPipeSinkTransferTimeoutMs() {
     return pipeSinkTransferTimeoutMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a49caa53368..09325cc959b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -179,6 +179,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
   }
 
+  public int getPipeAirGapSinkTabletTimeoutMs() {
+    return COMMON_CONFIG.getPipeAirGapSinkTabletTimeoutMs();
+  }
+
   public int getPipeSinkTransferTimeoutMs() {
     return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 832517b9745..2135bac0012 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -360,6 +360,13 @@ public class PipeDescriptor {
                     properties.getProperty(
                         "pipe_connector_handshake_timeout_ms",
                         
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+    config.setPipeAirGapSinkTabletTimeoutMs(
+        Long.parseLong(
+            
Optional.ofNullable(properties.getProperty("pipe_air_gap_sink_tablet_timeout_ms"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_air_gap_connector_tablet_timeout_ms",
+                        
String.valueOf(config.getPipeAirGapSinkTabletTimeoutMs())))));
     config.setPipeSinkReadFileBufferSize(
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))

Reply via email to