This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch no-stack
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/no-stack by this push:
new fea1c42d7c9 good-game
fea1c42d7c9 is described below
commit fea1c42d7c9f982e4aa2f07008ac6c790aae3ec7
Author: Caideyipi <[email protected]>
AuthorDate: Sat Feb 28 16:33:14 2026 +0800
good-game
---
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 3 +++
.../org/apache/iotdb/commons/conf/CommonConfig.java | 21 +++++++++++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 4 ++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 7 +++++++
.../commons/pipe/sink/protocol/IoTDBAirGapSink.java | 2 +-
5 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 5e2dfb6f9a0..9d7f4451fea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -99,6 +99,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final AirGapSocket socket = sockets.get(socketIndex);
try {
+ socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
} else {
@@ -112,6 +113,8 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
"Network error when transfer tablet insertion event %s, because
%s.",
((EnrichedEvent) tabletInsertionEvent).coreReportMessage(),
e.getMessage()),
e);
+ } finally {
+ socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cf68da89553..a490107ded3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -270,6 +270,7 @@ public class CommonConfig {
private long pipeSourceMatcherCacheSize = 1024;
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+ private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeSinkReadFileBufferSize = 5242880; // 5MB
private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
@@ -1077,6 +1078,26 @@ public class CommonConfig {
}
}
+ public int getPipeAirGapSinkTabletTimeoutMs() {
+ return pipeAirGapSinkTabletTimeoutMs;
+ }
+
+ public void setPipeAirGapSinkTabletTimeoutMs(long
pipeAirGapSinkTabletTimeoutMs) {
+ final int fPipeAirGapSinkTabletTimeoutMs =
this.pipeAirGapSinkTabletTimeoutMs;
+ try {
+ this.pipeAirGapSinkTabletTimeoutMs =
Math.toIntExact(pipeAirGapSinkTabletTimeoutMs);
+ } catch (ArithmeticException e) {
+ this.pipeAirGapSinkTabletTimeoutMs = Integer.MAX_VALUE;
+ logger.warn(
+ "Given pipe air gap sink tablet timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
+ } finally {
+ if (fPipeAirGapSinkTabletTimeoutMs !=
this.pipeAirGapSinkTabletTimeoutMs) {
+ logger.info(
+ "pipeAirGapSinkTabletTimeoutMs is set to {}.",
this.pipeAirGapSinkTabletTimeoutMs);
+ }
+ }
+ }
+
public int getPipeSinkTransferTimeoutMs() {
return pipeSinkTransferTimeoutMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a49caa53368..09325cc959b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -179,6 +179,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
}
+ public int getPipeAirGapSinkTabletTimeoutMs() {
+ return COMMON_CONFIG.getPipeAirGapSinkTabletTimeoutMs();
+ }
+
public int getPipeSinkTransferTimeoutMs() {
return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 832517b9745..2135bac0012 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -360,6 +360,13 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+ config.setPipeAirGapSinkTabletTimeoutMs(
+ Long.parseLong(
+
Optional.ofNullable(properties.getProperty("pipe_air_gap_sink_tablet_timeout_ms"))
+ .orElse(
+ properties.getProperty(
+ "pipe_air_gap_connector_tablet_timeout_ms",
+
String.valueOf(config.getPipeAirGapSinkTabletTimeoutMs())))));
config.setPipeSinkReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index 7d84e3bb98f..1f2467947fe 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -242,7 +242,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
} else {
supportModsIfIsDataNodeReceiver = true;
}
- socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
+ socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapNonTsFileTabletTimeoutMs());
LOGGER.info("Handshake success. Socket: {}", socket);
}