This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new cfad276cdf9 Pipe: Configured the air gap timeout to avoid packet loss 
(#17231) (#17253)
cfad276cdf9 is described below

commit cfad276cdf9c39b66e5db41b042af768fb13e695
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 4 18:39:09 2026 +0800

    Pipe: Configured the air gap timeout to avoid packet loss (#17231) (#17253)
    
    * good-game
    
    * ifx
    
    * fxi
    
    * xif
---
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java  |  5 +++++
 .../org/apache/iotdb/commons/conf/CommonConfig.java | 21 +++++++++++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java       |  4 ++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java   |  7 +++++++
 4 files changed, 37 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index e652325cb52..a2f09cf5e50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -95,6 +95,9 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     final AirGapSocket socket = sockets.get(socketIndex);
 
     try {
+      // When receiver encountered packet loss, the transfer will time out
+      // We need to restore the transfer quickly by retry under this 
circumstance
+      socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
       } else {
@@ -108,6 +111,8 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
               "Network error when transfer tablet insertion event %s, because 
%s.",
               ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), 
e.getMessage()),
           e);
+    } finally {
+      socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ddaa5f8ab07..0d4e15a9959 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -266,6 +266,7 @@ public class CommonConfig {
   private long pipeSourceMatcherCacheSize = 1024;
 
   private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+  private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
   private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeSinkReadFileBufferSize = 5242880; // 5MB
   private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
@@ -1019,6 +1020,26 @@ public class CommonConfig {
     }
   }
 
+  public int getPipeAirGapSinkTabletTimeoutMs() {
+    return pipeAirGapSinkTabletTimeoutMs;
+  }
+
+  public void setPipeAirGapSinkTabletTimeoutMs(long 
pipeAirGapSinkTabletTimeoutMs) {
+    final int fPipeAirGapSinkTabletTimeoutMs = 
this.pipeAirGapSinkTabletTimeoutMs;
+    try {
+      this.pipeAirGapSinkTabletTimeoutMs = 
Math.toIntExact(pipeAirGapSinkTabletTimeoutMs);
+    } catch (ArithmeticException e) {
+      this.pipeAirGapSinkTabletTimeoutMs = Integer.MAX_VALUE;
+      logger.warn(
+          "Given pipe air gap sink tablet timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
+    } finally {
+      if (fPipeAirGapSinkTabletTimeoutMs != 
this.pipeAirGapSinkTabletTimeoutMs) {
+        logger.info(
+            "pipeAirGapSinkTabletTimeoutMs is set to {}.", 
this.pipeAirGapSinkTabletTimeoutMs);
+      }
+    }
+  }
+
   public int getPipeSinkTransferTimeoutMs() {
     return pipeSinkTransferTimeoutMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 88dfaf50948..44e0004dc3e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -179,6 +179,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
   }
 
+  public int getPipeAirGapSinkTabletTimeoutMs() {
+    return COMMON_CONFIG.getPipeAirGapSinkTabletTimeoutMs();
+  }
+
   public int getPipeSinkTransferTimeoutMs() {
     return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 0722a016331..51d88e78345 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -360,6 +360,13 @@ public class PipeDescriptor {
                     properties.getProperty(
                         "pipe_connector_handshake_timeout_ms",
                         
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+    config.setPipeAirGapSinkTabletTimeoutMs(
+        Long.parseLong(
+            
Optional.ofNullable(properties.getProperty("pipe_air_gap_sink_tablet_timeout_ms"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_air_gap_connector_tablet_timeout_ms",
+                        
String.valueOf(config.getPipeAirGapSinkTabletTimeoutMs())))));
     config.setPipeSinkReadFileBufferSize(
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))

Reply via email to