This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new cfad276cdf9 Pipe: Configured the air gap timeout to avoid packet loss
(#17231) (#17253)
cfad276cdf9 is described below
commit cfad276cdf9c39b66e5db41b042af768fb13e695
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 4 18:39:09 2026 +0800
Pipe: Configured the air gap timeout to avoid packet loss (#17231) (#17253)
* good-game
* ifx
* fxi
* xif
---
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 5 +++++
.../org/apache/iotdb/commons/conf/CommonConfig.java | 21 +++++++++++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 4 ++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 7 +++++++
4 files changed, 37 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index e652325cb52..a2f09cf5e50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -95,6 +95,9 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final AirGapSocket socket = sockets.get(socketIndex);
try {
+ // When receiver encountered packet loss, the transfer will time out
+ // We need to restore the transfer quickly by retry under this
circumstance
+ socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
} else {
@@ -108,6 +111,8 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
"Network error when transfer tablet insertion event %s, because
%s.",
((EnrichedEvent) tabletInsertionEvent).coreReportMessage(),
e.getMessage()),
e);
+ } finally {
+ socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ddaa5f8ab07..0d4e15a9959 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -266,6 +266,7 @@ public class CommonConfig {
private long pipeSourceMatcherCacheSize = 1024;
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+ private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeSinkReadFileBufferSize = 5242880; // 5MB
private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
@@ -1019,6 +1020,26 @@ public class CommonConfig {
}
}
+ public int getPipeAirGapSinkTabletTimeoutMs() {
+ return pipeAirGapSinkTabletTimeoutMs;
+ }
+
+ public void setPipeAirGapSinkTabletTimeoutMs(long
pipeAirGapSinkTabletTimeoutMs) {
+ final int fPipeAirGapSinkTabletTimeoutMs =
this.pipeAirGapSinkTabletTimeoutMs;
+ try {
+ this.pipeAirGapSinkTabletTimeoutMs =
Math.toIntExact(pipeAirGapSinkTabletTimeoutMs);
+ } catch (ArithmeticException e) {
+ this.pipeAirGapSinkTabletTimeoutMs = Integer.MAX_VALUE;
+ logger.warn(
+ "Given pipe air gap sink tablet timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
+ } finally {
+ if (fPipeAirGapSinkTabletTimeoutMs !=
this.pipeAirGapSinkTabletTimeoutMs) {
+ logger.info(
+ "pipeAirGapSinkTabletTimeoutMs is set to {}.",
this.pipeAirGapSinkTabletTimeoutMs);
+ }
+ }
+ }
+
public int getPipeSinkTransferTimeoutMs() {
return pipeSinkTransferTimeoutMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 88dfaf50948..44e0004dc3e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -179,6 +179,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
}
+ public int getPipeAirGapSinkTabletTimeoutMs() {
+ return COMMON_CONFIG.getPipeAirGapSinkTabletTimeoutMs();
+ }
+
public int getPipeSinkTransferTimeoutMs() {
return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 0722a016331..51d88e78345 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -360,6 +360,13 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+ config.setPipeAirGapSinkTabletTimeoutMs(
+ Long.parseLong(
+
Optional.ofNullable(properties.getProperty("pipe_air_gap_sink_tablet_timeout_ms"))
+ .orElse(
+ properties.getProperty(
+ "pipe_air_gap_connector_tablet_timeout_ms",
+
String.valueOf(config.getPipeAirGapSinkTabletTimeoutMs())))));
config.setPipeSinkReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))