This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new be0b921ea88 Pipe: Add retry for tablet batch req to avoid 
retransmission when memory is insufficient (#15715)
be0b921ea88 is described below

commit be0b921ea88fd8cbf5ee35cbfc5cd45a11f70f3b
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Jun 16 15:23:23 2025 +0800

    Pipe: Add retry for tablet batch req to avoid retransmission when memory is 
insufficient (#15715)
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 63 +++++++++++++---------
 1 file changed, 39 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index b462fb8d36b..b8634f92b64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -473,10 +473,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             Stream.of(
                     statementPair.getLeft().isEmpty()
                         ? RpcUtils.SUCCESS_STATUS
-                        : 
executeStatementAndAddRedirectInfo(statementPair.getLeft()),
+                        : 
executeBatchStatementAndAddRedirectInfo(statementPair.getLeft()),
                     statementPair.getRight().isEmpty()
                         ? RpcUtils.SUCCESS_STATUS
-                        : 
executeStatementAndAddRedirectInfo(statementPair.getRight()))
+                        : 
executeBatchStatementAndAddRedirectInfo(statementPair.getRight()))
                 .collect(Collectors.toList())));
   }
 
@@ -486,7 +486,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         PipeReceiverStatusHandler.getPriorStatus(
             (statementSet.isEmpty()
                     ? Stream.of(RpcUtils.SUCCESS_STATUS)
-                    : 
statementSet.stream().map(this::executeStatementAndAddRedirectInfo))
+                    : 
statementSet.stream().map(this::executeBatchStatementAndAddRedirectInfo))
                 .collect(Collectors.toList())));
   }
 
@@ -734,8 +734,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
    * request. So for each sub-status which needs to redirect, we record the 
device path using the
    * message field.
    */
-  private TSStatus executeStatementAndAddRedirectInfo(final 
InsertBaseStatement statement) {
-    final TSStatus result = executeStatementAndClassifyExceptions(statement);
+  private TSStatus executeBatchStatementAndAddRedirectInfo(final 
InsertBaseStatement statement) {
+    final TSStatus result = executeStatementAndClassifyExceptions(statement, 
5);
 
     if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
         && result.getSubStatusSize() > 0) {
@@ -771,15 +771,46 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
+    return executeStatementAndClassifyExceptions(statement, 1);
+  }
+
+  private TSStatus executeStatementAndClassifyExceptions(
+      final Statement statement, final int tryCount) {
     long estimatedMemory = 0L;
     final double pipeReceiverActualToEstimatedMemoryRatio =
         PIPE_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
     try {
       if (statement instanceof InsertBaseStatement) {
         estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
-        allocatedMemoryBlock =
-            PipeDataNodeResourceManager.memory()
-                .forceAllocate((long) (estimatedMemory * 
pipeReceiverActualToEstimatedMemoryRatio));
+        for (int i = 0; i < tryCount; ++i) {
+          try {
+            allocatedMemoryBlock =
+                PipeDataNodeResourceManager.memory()
+                    .forceAllocate(
+                        (long) (estimatedMemory * 
pipeReceiverActualToEstimatedMemoryRatio));
+            break;
+          } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+            if (i == tryCount - 1) {
+              final String message =
+                  String.format(
+                      "Temporarily out of memory when executing statement %s, 
Requested memory: %s, "
+                          + "used memory: %s, free memory: %s, total 
non-floating memory: %s",
+                      statement,
+                      estimatedMemory * 
pipeReceiverActualToEstimatedMemoryRatio,
+                      
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+                      
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+                      
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Receiver id = {}: {}", receiverId.get(), 
message, e);
+              }
+              return new TSStatus(
+                      
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                  .setMessage(message);
+            } else {
+              Thread.sleep(100L * (i + 1));
+            }
+          }
+        }
       }
 
       final TSStatus result =
@@ -795,22 +826,6 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             result);
         return statement.accept(STATEMENT_STATUS_VISITOR, result);
       }
-    } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
-      final String message =
-          String.format(
-              "Temporarily out of memory when executing statement %s, 
Requested memory: %s, "
-                  + "used memory: %s, free memory: %s, total non-floating 
memory: %s",
-              statement,
-              estimatedMemory * pipeReceiverActualToEstimatedMemoryRatio,
-              PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
-              PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
-              
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
-      }
-      return new TSStatus(
-              
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(message);
     } catch (final Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Exception encountered while executing statement 
{}: ",

Reply via email to