This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch air-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/air-13 by this push:
new 7dbdac4099c Pipe: Enabled retry locally for air gap receiver &
temporary unavailable exception (#17188)
7dbdac4099c is described below
commit 7dbdac4099cf32af2b8d5ea1e45c0d298572cae6
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 10 09:29:48 2026 +0800
Pipe: Enabled retry locally for air gap receiver & temporary unavailable
exception (#17188)
* fix
* some-part
* Update IoTDBAirGapReceiver.java
* fix
* mipl
* logger
---
.../protocol/airgap/IoTDBAirGapReceiver.java | 60 ++++++++++++++--------
.../apache/iotdb/commons/conf/CommonConfig.java | 52 +++++++++++++++----
.../iotdb/commons/pipe/config/PipeConfig.java | 8 +++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 11 +++-
4 files changed, 99 insertions(+), 32 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 610b9e5fe1a..0ff1834e8cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -133,27 +133,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
.setVersion(ReadWriteIOUtils.readByte(byteBuffer))
.setType(ReadWriteIOUtils.readShort(byteBuffer))
.setBody(byteBuffer.slice());
- final TPipeTransferResp resp = agent.receive(req);
-
- final TSStatus status = resp.getStatus();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- ok();
- } else if (status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
- || status.getCode()
- ==
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
- LOGGER.info(
- "Pipe air gap receiver {}: TSStatus {} is encountered at the air
gap receiver, will ignore.",
- receiverId,
- resp.getStatus());
- ok();
- } else {
- LOGGER.warn(
- "Pipe air gap receiver {}: Handle data failed, status: {}, req:
{}",
- receiverId,
- resp.getStatus(),
- req);
- fail();
- }
+ handleReq(req, System.currentTimeMillis());
} catch (final PipeConnectionException e) {
LOGGER.info(
"Pipe air gap receiver {}: Socket {} closed when listening to data.
Because: {}",
@@ -171,6 +151,44 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
}
}
+ private void handleReq(final AirGapPseudoTPipeTransferRequest req, final
long startTime)
+ throws IOException {
+ final TPipeTransferResp resp = agent.receive(req);
+
+ final TSStatus status = resp.getStatus();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ ok();
+ } else if (status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ || status.getCode()
+ ==
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+ LOGGER.info(
+ "Pipe air gap receiver {}: TSStatus {} is encountered at the air gap
receiver, will ignore.",
+ receiverId,
+ resp.getStatus());
+ ok();
+ } else if (status.getCode()
+ ==
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+ try {
+
Thread.sleep(PipeConfig.getInstance().getPipeAirGapRetryLocalIntervalMs());
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ LOGGER.info(
+ "Temporary unavailable exception encountered at air gap receiver,
will retry locally.");
+ if (System.currentTimeMillis() - startTime
+ < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
+ handleReq(req, startTime);
+ }
+ } else {
+ LOGGER.warn(
+ "Pipe air gap receiver {}: Handle data failed, status: {}, req: {}",
+ receiverId,
+ resp.getStatus(),
+ req);
+ fail();
+ }
+ }
+
private void ok() throws IOException {
final OutputStream outputStream = socket.getOutputStream();
outputStream.write(AirGapOneByteResponse.OK);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 62c389df758..ddaa5f8ab07 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRateAverage;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.rpc.RpcUtils;
@@ -275,7 +276,7 @@ public class CommonConfig {
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
- private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
+ private long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall = 500;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
private int pipeAsyncConnectorMaxClientNumber =
@@ -301,6 +302,9 @@ public class CommonConfig {
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;
+ private long pipeAirGapRetryLocalIntervalMs = 1000L;
+ private long pipeAirGapRetryMaxMs = -1;
+
private long pipeReceiverLoginPeriodicVerificationIntervalMs = -1;
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
@@ -1126,21 +1130,20 @@ public class CommonConfig {
return pipeAsyncSinkForcedRetryTotalEventQueueSize;
}
- public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
- long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
- if (this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall
- == pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
+ public void setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(
+ long pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) {
+ if (this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall
+ == pipeAsyncSinkMaxRetryExecutionTimeMsPerCall) {
return;
}
- this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
- pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+ this.pipeAsyncSinkMaxRetryExecutionTimeMsPerCall =
pipeAsyncSinkMaxRetryExecutionTimeMsPerCall;
logger.info(
- "pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall is set to {}.",
- pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
+ "pipeAsyncSinkMaxRetryExecutionTimeMsPerCall is set to {}.",
+ pipeAsyncSinkMaxRetryExecutionTimeMsPerCall);
}
public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
- return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+ return pipeAsyncSinkMaxRetryExecutionTimeMsPerCall;
}
public int getPipeAsyncSinkSelectorNumber() {
@@ -1558,6 +1561,35 @@ public class CommonConfig {
return pipeAirGapReceiverPort;
}
+ public long getPipeAirGapRetryLocalIntervalMs() {
+ return pipeAirGapRetryLocalIntervalMs;
+ }
+
+ public void setPipeAirGapRetryLocalIntervalMs(long
pipeAirGapRetryLocalIntervalMs) {
+ if (pipeAirGapRetryLocalIntervalMs == this.pipeAirGapRetryLocalIntervalMs)
{
+ return;
+ }
+ this.pipeAirGapRetryLocalIntervalMs = pipeAirGapRetryLocalIntervalMs;
+ logger.info("pipeAirGapRetryLocalIntervalMs is set to {}.",
pipeAirGapRetryLocalIntervalMs);
+ }
+
+ // < 0 : 0.8 * transfer timeout to avoid timeout
+ // = 0 : Disable retry
+ // > 0 : Explicit configuration
+ public long getPipeAirGapRetryMaxMs() {
+ return pipeAirGapRetryMaxMs >= 0
+ ? pipeAirGapRetryMaxMs
+ : (long) (PipeConfig.getInstance().getPipeSinkTransferTimeoutMs() *
0.8);
+ }
+
+ public void setPipeAirGapRetryMaxMs(long pipeAirGapRetryMaxMs) {
+ if (pipeAirGapRetryMaxMs == this.pipeAirGapRetryMaxMs) {
+ return;
+ }
+ this.pipeAirGapRetryMaxMs = pipeAirGapRetryMaxMs;
+ logger.info("pipeAirGapRetryMaxMs is set to {}.", pipeAirGapRetryMaxMs);
+ }
+
public void setPipeReceiverLoginPeriodicVerificationIntervalMs(
long pipeReceiverLoginPeriodicVerificationIntervalMs) {
if (this.pipeReceiverLoginPeriodicVerificationIntervalMs
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index fbf4cf191ac..88dfaf50948 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -335,6 +335,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAirGapReceiverPort();
}
+ public long getPipeAirGapRetryLocalIntervalMs() {
+ return COMMON_CONFIG.getPipeAirGapRetryLocalIntervalMs();
+ }
+
+ public long getPipeAirGapRetryMaxMs() {
+ return COMMON_CONFIG.getPipeAirGapRetryMaxMs();
+ }
+
/////////////////////////////// Receiver ///////////////////////////////
public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 5322bfcd7ba..0722a016331 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -101,6 +101,15 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_air_gap_receiver_port",
Integer.toString(config.getPipeAirGapReceiverPort()))));
+ config.setPipeAirGapRetryLocalIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_air_gap_retry_local_interval_ms",
+ Long.toString(config.getPipeAirGapRetryLocalIntervalMs()))));
+ config.setPipeAirGapRetryMaxMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_air_gap_retry_max_ms",
Long.toString(config.getPipeAirGapRetryMaxMs()))));
config.setPipeMetaReportMaxLogNumPerRound(
Double.parseDouble(
@@ -387,7 +396,7 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
- config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+ config.setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(
Long.parseLong(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))