This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6ec3a5144b1 Pipe: Configured the air gap timeout to avoid packet loss
(#17231)
6ec3a5144b1 is described below
commit 6ec3a5144b16eee6897f1baab48642c47e4e80c3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 4 14:17:09 2026 +0800
Pipe: Configured the air gap timeout to avoid packet loss (#17231)
* good-game
* ifx
* fxi
* xif
---
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 5 +++++
.../org/apache/iotdb/commons/conf/CommonConfig.java | 21 +++++++++++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 4 ++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 7 +++++++
4 files changed, 37 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 5e2dfb6f9a0..5bee20c4dc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -99,6 +99,9 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final AirGapSocket socket = sockets.get(socketIndex);
try {
+ // When receiver encountered packet loss, the transfer will time out
+ // We need to restore the transfer quickly by retry under this
circumstance
+ socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
} else {
@@ -112,6 +115,8 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
"Network error when transfer tablet insertion event %s, because
%s.",
((EnrichedEvent) tabletInsertionEvent).coreReportMessage(),
e.getMessage()),
e);
+ } finally {
+ socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cf68da89553..a490107ded3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -270,6 +270,7 @@ public class CommonConfig {
private long pipeSourceMatcherCacheSize = 1024;
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+ private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeSinkReadFileBufferSize = 5242880; // 5MB
private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
@@ -1077,6 +1078,26 @@ public class CommonConfig {
}
}
+ public int getPipeAirGapSinkTabletTimeoutMs() {
+ return pipeAirGapSinkTabletTimeoutMs;
+ }
+
+ public void setPipeAirGapSinkTabletTimeoutMs(long
pipeAirGapSinkTabletTimeoutMs) {
+ final int fPipeAirGapSinkTabletTimeoutMs =
this.pipeAirGapSinkTabletTimeoutMs;
+ try {
+ this.pipeAirGapSinkTabletTimeoutMs =
Math.toIntExact(pipeAirGapSinkTabletTimeoutMs);
+ } catch (ArithmeticException e) {
+ this.pipeAirGapSinkTabletTimeoutMs = Integer.MAX_VALUE;
+ logger.warn(
+ "Given pipe air gap sink tablet timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
+ } finally {
+ if (fPipeAirGapSinkTabletTimeoutMs !=
this.pipeAirGapSinkTabletTimeoutMs) {
+ logger.info(
+ "pipeAirGapSinkTabletTimeoutMs is set to {}.",
this.pipeAirGapSinkTabletTimeoutMs);
+ }
+ }
+ }
+
public int getPipeSinkTransferTimeoutMs() {
return pipeSinkTransferTimeoutMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a49caa53368..09325cc959b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -179,6 +179,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
}
+ public int getPipeAirGapSinkTabletTimeoutMs() {
+ return COMMON_CONFIG.getPipeAirGapSinkTabletTimeoutMs();
+ }
+
public int getPipeSinkTransferTimeoutMs() {
return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 832517b9745..2135bac0012 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -360,6 +360,13 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+ config.setPipeAirGapSinkTabletTimeoutMs(
+ Long.parseLong(
+
Optional.ofNullable(properties.getProperty("pipe_air_gap_sink_tablet_timeout_ms"))
+ .orElse(
+ properties.getProperty(
+ "pipe_air_gap_connector_tablet_timeout_ms",
+
String.valueOf(config.getPipeAirGapSinkTabletTimeoutMs())))));
config.setPipeSinkReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))