This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new be0b921ea88 Pipe: Add retry for tablet batch req to avoid
retransmission when memory is insufficient (#15715)
be0b921ea88 is described below
commit be0b921ea88fd8cbf5ee35cbfc5cd45a11f70f3b
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Jun 16 15:23:23 2025 +0800
Pipe: Add retry for tablet batch req to avoid retransmission when memory is
insufficient (#15715)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 63 +++++++++++++---------
1 file changed, 39 insertions(+), 24 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index b462fb8d36b..b8634f92b64 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -473,10 +473,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
Stream.of(
statementPair.getLeft().isEmpty()
? RpcUtils.SUCCESS_STATUS
- :
executeStatementAndAddRedirectInfo(statementPair.getLeft()),
+ :
executeBatchStatementAndAddRedirectInfo(statementPair.getLeft()),
statementPair.getRight().isEmpty()
? RpcUtils.SUCCESS_STATUS
- :
executeStatementAndAddRedirectInfo(statementPair.getRight()))
+ :
executeBatchStatementAndAddRedirectInfo(statementPair.getRight()))
.collect(Collectors.toList())));
}
@@ -486,7 +486,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
PipeReceiverStatusHandler.getPriorStatus(
(statementSet.isEmpty()
? Stream.of(RpcUtils.SUCCESS_STATUS)
- :
statementSet.stream().map(this::executeStatementAndAddRedirectInfo))
+ :
statementSet.stream().map(this::executeBatchStatementAndAddRedirectInfo))
.collect(Collectors.toList())));
}
@@ -734,8 +734,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
* request. So for each sub-status which needs to redirect, we record the
device path using the
* message field.
*/
- private TSStatus executeStatementAndAddRedirectInfo(final
InsertBaseStatement statement) {
- final TSStatus result = executeStatementAndClassifyExceptions(statement);
+ private TSStatus executeBatchStatementAndAddRedirectInfo(final
InsertBaseStatement statement) {
+ final TSStatus result = executeStatementAndClassifyExceptions(statement,
5);
if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
&& result.getSubStatusSize() > 0) {
@@ -771,15 +771,46 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TSStatus executeStatementAndClassifyExceptions(final Statement
statement) {
+ return executeStatementAndClassifyExceptions(statement, 1);
+ }
+
+ private TSStatus executeStatementAndClassifyExceptions(
+ final Statement statement, final int tryCount) {
long estimatedMemory = 0L;
final double pipeReceiverActualToEstimatedMemoryRatio =
PIPE_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
try {
if (statement instanceof InsertBaseStatement) {
estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
- allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .forceAllocate((long) (estimatedMemory *
pipeReceiverActualToEstimatedMemoryRatio));
+ for (int i = 0; i < tryCount; ++i) {
+ try {
+ allocatedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocate(
+ (long) (estimatedMemory *
pipeReceiverActualToEstimatedMemoryRatio));
+ break;
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ if (i == tryCount - 1) {
+ final String message =
+ String.format(
+ "Temporarily out of memory when executing statement %s,
Requested memory: %s, "
+ + "used memory: %s, free memory: %s, total
non-floating memory: %s",
+ statement,
+ estimatedMemory *
pipeReceiverActualToEstimatedMemoryRatio,
+
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Receiver id = {}: {}", receiverId.get(),
message, e);
+ }
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(message);
+ } else {
+ Thread.sleep(100L * (i + 1));
+ }
+ }
+ }
}
final TSStatus result =
@@ -795,22 +826,6 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
result);
return statement.accept(STATEMENT_STATUS_VISITOR, result);
}
- } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
- final String message =
- String.format(
- "Temporarily out of memory when executing statement %s,
Requested memory: %s, "
- + "used memory: %s, free memory: %s, total non-floating
memory: %s",
- statement,
- estimatedMemory * pipeReceiverActualToEstimatedMemoryRatio,
- PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
- PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
-
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
- }
- return new TSStatus(
-
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
- .setMessage(message);
} catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Exception encountered while executing statement
{}: ",