This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e7c9c0885a Pipe: fixed the bug that processor dies when encountered 
memory shortage for tablets in a period of time (#12042)
1e7c9c0885a is described below

commit 1e7c9c0885ab3c13e8898a7b34dffa9174cb1ab2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 19 19:00:58 2024 +0800

    Pipe: fixed the bug that processor dies when encountered memory shortage 
for tablets in a period of time (#12042)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../thrift/sync/IoTDBThriftSyncClientManager.java  |  3 +--
 .../common/tablet/PipeRawTabletInsertionEvent.java |  4 ++--
 .../db/pipe/resource/memory/PipeMemoryManager.java | 22 +++++++++++++++++-----
 .../pipe/task/connection/PipeEventCollector.java   |  2 ++
 .../db/pipe/task/subtask/PipeDataNodeSubtask.java  |  4 ++--
 .../subtask/processor/PipeProcessorSubtask.java    | 11 +++++++++++
 .../processor/PipeProcessorSubtaskWorker.java      |  6 +-----
 .../commons/pipe/task/subtask/PipeSubtask.java     |  4 ----
 8 files changed, 36 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index 6cc5c5ac683..db7844a213b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -74,7 +73,7 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
     }
   }
 
-  public void checkClientStatusAndTryReconstructIfNecessary() throws 
IOException {
+  public void checkClientStatusAndTryReconstructIfNecessary() {
     // reconstruct all dead clients
     for (final Map.Entry<TEndPoint, Pair<IoTDBThriftSyncConnectorClient, 
Boolean>> entry :
         endPoint2ClientAndStatus.entrySet()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 93fe966409c..eca75a95cec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -42,7 +42,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   private final EnrichedEvent sourceEvent;
   private boolean needToReport;
 
-  private PipeMemoryBlock allocatedMemoryBlock;
+  private PipeTabletMemoryBlock allocatedMemoryBlock;
 
   private TabletInsertionDataContainer dataContainer;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 3f7a3eb2849..c268d5bb542 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -61,7 +61,7 @@ public class PipeMemoryManager {
   // threshold, allocations of memory block for tablets will be rejected.
   private static final double TABLET_MEMORY_REJECT_THRESHOLD =
       
PipeConfig.getInstance().getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
-  private long usedMemorySizeInBytesOfTablets;
+  private volatile long usedMemorySizeInBytesOfTablets;
 
   private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
 
@@ -78,8 +78,13 @@ public class PipeMemoryManager {
     return forceAllocate(sizeInBytes, false);
   }
 
-  public PipeMemoryBlock forceAllocateWithRetry(Tablet tablet)
+  public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)
       throws PipeRuntimeOutOfMemoryCriticalException {
+    if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+      // No need to calculate the tablet size, skip it to save time
+      return new PipeTabletMemoryBlock(0);
+    }
+
     for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
       if ((double) usedMemorySizeInBytesOfTablets / TOTAL_MEMORY_SIZE_IN_BYTES
           < TABLET_MEMORY_REJECT_THRESHOLD) {
@@ -104,7 +109,8 @@ public class PipeMemoryManager {
     }
 
     synchronized (this) {
-      final PipeMemoryBlock block = 
forceAllocate(calculateTabletSizeInBytes(tablet), true);
+      final PipeTabletMemoryBlock block =
+          (PipeTabletMemoryBlock) 
forceAllocate(calculateTabletSizeInBytes(tablet), true);
       usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
       return block;
     }
@@ -113,8 +119,9 @@ public class PipeMemoryManager {
   private PipeMemoryBlock forceAllocate(long sizeInBytes, boolean isForTablet)
       throws PipeRuntimeOutOfMemoryCriticalException {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
-      // No need to consider isForTablet, for memory control is disabled
-      return new PipeMemoryBlock(sizeInBytes);
+      return isForTablet
+          ? new PipeTabletMemoryBlock(sizeInBytes)
+          : new PipeMemoryBlock(sizeInBytes);
     }
 
     for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
@@ -155,6 +162,11 @@ public class PipeMemoryManager {
     if (usedThreshold < 0.0f || usedThreshold > 1.0f) {
       return null;
     }
+
+    if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+      return new PipeMemoryBlock(sizeInBytes);
+    }
+
     if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
         && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES < 
usedThreshold) {
       return forceAllocate(sizeInBytes);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 50a91c82648..9ee6ff3f836 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -70,6 +70,8 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
       } else {
         collectEvent(event);
       }
+    } catch (PipeException e) {
+      throw e;
     } catch (Exception e) {
       throw new PipeException("Error occurred when collecting events from 
processor.", e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
index 9b434423c4f..c7a487ab61c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
@@ -70,8 +70,8 @@ public abstract class PipeDataNodeSubtask extends PipeSubtask 
{
           throwable);
     }
 
-    if (retryCount.get() < MAX_RETRY_TIMES) {
-      retryCount.incrementAndGet();
+    retryCount.incrementAndGet();
+    if (retryCount.get() <= MAX_RETRY_TIMES) {
       LOGGER.warn(
           "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count [{}/{}]",
           taskID,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 2d3006b4166..8ef5fbb00f3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.pipe.task.subtask.processor;
 
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
@@ -139,6 +141,11 @@ public class PipeProcessorSubtask extends 
PipeDataNodeSubtask {
         }
       }
       releaseLastEvent(!isClosed.get() && 
outputEventCollector.hasNoCollectInvocationAfterReset());
+    } catch (PipeRuntimeOutOfMemoryCriticalException e) {
+      LOGGER.info(
+          "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.",
+          e);
+      return false;
     } catch (Exception e) {
       if (!isClosed.get()) {
         throw new PipeException(
@@ -162,6 +169,10 @@ public class PipeProcessorSubtask extends 
PipeDataNodeSubtask {
     // and the worker will be submitted to the executor
   }
 
+  public boolean isStoppedByException() {
+    return lastEvent instanceof EnrichedEvent && retryCount.get() > 
MAX_RETRY_TIMES;
+  }
+
   @Override
   public void close() {
     PipeProcessorMetrics.getInstance().deregister(taskID);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
index 645edb17a0e..64f01b24105 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
@@ -28,8 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask.MAX_RETRY_TIMES;
-
 public class PipeProcessorSubtaskWorker extends WrappedRunnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeProcessorSubtaskWorker.class);
@@ -71,9 +69,7 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
     boolean canSleepBeforeNextRound = true;
 
     for (final PipeProcessorSubtask subtask : subtasks.keySet()) {
-      if (subtask.isClosed()
-          || !subtask.isSubmittingSelf()
-          || MAX_RETRY_TIMES <= subtask.getRetryCount()) {
+      if (subtask.isClosed() || !subtask.isSubmittingSelf() || 
subtask.isStoppedByException()) {
         continue;
       }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
index 98fd635b2be..bfc8a1c459b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
@@ -161,8 +161,4 @@ public abstract class PipeSubtask
   public long getCreationTime() {
     return creationTime;
   }
-
-  public int getRetryCount() {
-    return retryCount.get();
-  }
 }

Reply via email to