This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1e7c9c0885a Pipe: fixed the bug that processor dies when encountered
memory shortage for tablets in a period of time (#12042)
1e7c9c0885a is described below
commit 1e7c9c0885ab3c13e8898a7b34dffa9174cb1ab2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 19 19:00:58 2024 +0800
Pipe: fixed the bug that processor dies when encountered memory shortage
for tablets in a period of time (#12042)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../thrift/sync/IoTDBThriftSyncClientManager.java | 3 +--
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 ++--
.../db/pipe/resource/memory/PipeMemoryManager.java | 22 +++++++++++++++++-----
.../pipe/task/connection/PipeEventCollector.java | 2 ++
.../db/pipe/task/subtask/PipeDataNodeSubtask.java | 4 ++--
.../subtask/processor/PipeProcessorSubtask.java | 11 +++++++++++
.../processor/PipeProcessorSubtaskWorker.java | 6 +-----
.../commons/pipe/task/subtask/PipeSubtask.java | 4 ----
8 files changed, 36 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index 6cc5c5ac683..db7844a213b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,7 +73,7 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
}
}
- public void checkClientStatusAndTryReconstructIfNecessary() throws
IOException {
+ public void checkClientStatusAndTryReconstructIfNecessary() {
// reconstruct all dead clients
for (final Map.Entry<TEndPoint, Pair<IoTDBThriftSyncConnectorClient,
Boolean>> entry :
endPoint2ClientAndStatus.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 93fe966409c..eca75a95cec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -42,7 +42,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
private final EnrichedEvent sourceEvent;
private boolean needToReport;
- private PipeMemoryBlock allocatedMemoryBlock;
+ private PipeTabletMemoryBlock allocatedMemoryBlock;
private TabletInsertionDataContainer dataContainer;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 3f7a3eb2849..c268d5bb542 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -61,7 +61,7 @@ public class PipeMemoryManager {
// threshold, allocations of memory block for tablets will be rejected.
private static final double TABLET_MEMORY_REJECT_THRESHOLD =
PipeConfig.getInstance().getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
- private long usedMemorySizeInBytesOfTablets;
+ private volatile long usedMemorySizeInBytesOfTablets;
private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
@@ -78,8 +78,13 @@ public class PipeMemoryManager {
return forceAllocate(sizeInBytes, false);
}
- public PipeMemoryBlock forceAllocateWithRetry(Tablet tablet)
+ public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)
throws PipeRuntimeOutOfMemoryCriticalException {
+ if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+ // No need to calculate the tablet size, skip it to save time
+ return new PipeTabletMemoryBlock(0);
+ }
+
for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
if ((double) usedMemorySizeInBytesOfTablets / TOTAL_MEMORY_SIZE_IN_BYTES
< TABLET_MEMORY_REJECT_THRESHOLD) {
@@ -104,7 +109,8 @@ public class PipeMemoryManager {
}
synchronized (this) {
- final PipeMemoryBlock block =
forceAllocate(calculateTabletSizeInBytes(tablet), true);
+ final PipeTabletMemoryBlock block =
+ (PipeTabletMemoryBlock)
forceAllocate(calculateTabletSizeInBytes(tablet), true);
usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
return block;
}
@@ -113,8 +119,9 @@ public class PipeMemoryManager {
private PipeMemoryBlock forceAllocate(long sizeInBytes, boolean isForTablet)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
- // No need to consider isForTablet, for memory control is disabled
- return new PipeMemoryBlock(sizeInBytes);
+ return isForTablet
+ ? new PipeTabletMemoryBlock(sizeInBytes)
+ : new PipeMemoryBlock(sizeInBytes);
}
for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
@@ -155,6 +162,11 @@ public class PipeMemoryManager {
if (usedThreshold < 0.0f || usedThreshold > 1.0f) {
return null;
}
+
+ if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+ return new PipeMemoryBlock(sizeInBytes);
+ }
+
if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
&& (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES <
usedThreshold) {
return forceAllocate(sizeInBytes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 50a91c82648..9ee6ff3f836 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -70,6 +70,8 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
} else {
collectEvent(event);
}
+ } catch (PipeException e) {
+ throw e;
} catch (Exception e) {
throw new PipeException("Error occurred when collecting events from
processor.", e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
index 9b434423c4f..c7a487ab61c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java
@@ -70,8 +70,8 @@ public abstract class PipeDataNodeSubtask extends PipeSubtask
{
throwable);
}
- if (retryCount.get() < MAX_RETRY_TIMES) {
- retryCount.incrementAndGet();
+ retryCount.incrementAndGet();
+ if (retryCount.get() <= MAX_RETRY_TIMES) {
LOGGER.warn(
"Retry executing subtask {} (creation time: {}, simple class: {}),
retry count [{}/{}]",
taskID,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 2d3006b4166..8ef5fbb00f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.pipe.task.subtask.processor;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.EventSupplier;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
@@ -139,6 +141,11 @@ public class PipeProcessorSubtask extends
PipeDataNodeSubtask {
}
}
releaseLastEvent(!isClosed.get() &&
outputEventCollector.hasNoCollectInvocationAfterReset());
+ } catch (PipeRuntimeOutOfMemoryCriticalException e) {
+ LOGGER.info(
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release.",
+ e);
+ return false;
} catch (Exception e) {
if (!isClosed.get()) {
throw new PipeException(
@@ -162,6 +169,10 @@ public class PipeProcessorSubtask extends
PipeDataNodeSubtask {
// and the worker will be submitted to the executor
}
+ public boolean isStoppedByException() {
+ return lastEvent instanceof EnrichedEvent && retryCount.get() >
MAX_RETRY_TIMES;
+ }
+
@Override
public void close() {
PipeProcessorMetrics.getInstance().deregister(taskID);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
index 645edb17a0e..64f01b24105 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
@@ -28,8 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask.MAX_RETRY_TIMES;
-
public class PipeProcessorSubtaskWorker extends WrappedRunnable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeProcessorSubtaskWorker.class);
@@ -71,9 +69,7 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
boolean canSleepBeforeNextRound = true;
for (final PipeProcessorSubtask subtask : subtasks.keySet()) {
- if (subtask.isClosed()
- || !subtask.isSubmittingSelf()
- || MAX_RETRY_TIMES <= subtask.getRetryCount()) {
+ if (subtask.isClosed() || !subtask.isSubmittingSelf() ||
subtask.isStoppedByException()) {
continue;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
index 98fd635b2be..bfc8a1c459b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
@@ -161,8 +161,4 @@ public abstract class PipeSubtask
public long getCreationTime() {
return creationTime;
}
-
- public int getRetryCount() {
- return retryCount.get();
- }
}