This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch optimize-pipe-after-stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/optimize-pipe-after-stop by
this push:
new 6ecf66fc722 op
6ecf66fc722 is described below
commit 6ecf66fc72229f87f9d6a6cd1952943b575d0cd8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Mar 10 13:25:28 2025 +0800
op
---
.../thrift/async/IoTDBDataRegionAsyncConnector.java | 18 +++++++++++++++++-
.../org/apache/iotdb/commons/conf/CommonConfig.java | 11 +++++++++++
.../apache/iotdb/commons/conf/CommonDescriptor.java | 9 +++++++++
.../apache/iotdb/commons/pipe/config/PipeConfig.java | 7 +++++++
4 files changed, 44 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index d98b93d9526..942e60df567 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
@@ -92,6 +93,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private final IoTDBDataRegionSyncConnector retryConnector = new
IoTDBDataRegionSyncConnector();
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
+ private final long maxRetryExecutionTimeMsPerCall =
+
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
private IoTDBDataNodeAsyncClientManager clientManager;
@@ -416,11 +419,19 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
* @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
*/
private void transferQueuedEventsIfNecessary() throws Exception {
+ if (retryEventQueue.isEmpty()) {
+ return;
+ }
+
+ final long currentTime = System.currentTimeMillis();
while (!retryEventQueue.isEmpty()) {
synchronized (this) {
- if (isClosed.get() || retryEventQueue.isEmpty()) {
+ if (isClosed.get()) {
return;
}
+ if (retryEventQueue.isEmpty()) {
+ break;
+ }
final Event peekedEvent = retryEventQueue.peek();
@@ -453,6 +464,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
LOGGER.debug("Polled event {} from retry queue.", polledEvent);
}
}
+
+ // Stop retrying if the execution time exceeds the threshold for better
realtime performance
+ if (System.currentTimeMillis() - currentTime >
maxRetryExecutionTimeMsPerCall) {
+ break;
+ }
}
// Trigger cron heartbeat event in retry connector to send batch in time
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 340e7e26462..c432ea718ef 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -235,6 +235,7 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+ private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
private int pipeAsyncConnectorMaxClientNumber =
@@ -859,6 +860,16 @@ public class CommonConfig {
return pipeConnectorRPCThriftCompressionEnabled;
}
+ public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+ long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
+ this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
+ pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+ }
+
+ public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+ return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+ }
+
public int getPipeAsyncConnectorSelectorNumber() {
return pipeAsyncConnectorSelectorNumber;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 10b6978149c..b3613bb6677 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -421,6 +421,15 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+ config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+ Long.parseLong(
+ Optional.ofNullable(
+
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))
+ .orElse(
+ properties.getProperty(
+
"pipe_async_connector_max_retry_execution_time_ms_per_call",
+ String.valueOf(
+
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
int pipeAsyncConnectorSelectorNumber =
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index c0cc1a4d9fb..0f558782f00 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -152,6 +152,10 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}
+ public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+ return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+ }
+
public int getPipeAsyncConnectorSelectorNumber() {
return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
}
@@ -434,6 +438,9 @@ public class PipeConfig {
"PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
+ LOGGER.info(
+ "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
+ getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());