This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 29c9186525d Pipe: Enable dynamic adjusting of allocated pipe memory
(#11518)
29c9186525d is described below
commit 29c9186525d8397359f1644c7955eca8ac7a2899
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 13 21:55:26 2023 +0800
Pipe: Enable dynamic adjusting of allocated pipe memory (#11518)
The allocated pipe memory will automatically shrink after a piece of new
memory is created if the space is insufficient, and will expand periodically
when the total space is sufficient.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/agent/runtime/PipeCronEventInjector.java | 72 -----------
.../agent/runtime/PipePeriodicalJobExecutor.java | 100 +++++++++++++++
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 7 +-
...oTDBThriftAsyncPipeTransferBatchReqBuilder.java | 2 +-
...IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 2 +-
.../builder/PipeTransferBatchReqBuilder.java | 30 ++++-
.../realtime/assigner/DisruptorQueue.java | 2 +-
.../db/pipe/resource/memory/PipeMemoryBlock.java | 137 ++++++++++++++++++++-
.../db/pipe/resource/memory/PipeMemoryManager.java | 115 ++++++++++++++---
.../dataregion/wal/utils/WALInsertNodeCache.java | 62 ++++++++--
.../iotdb/commons/concurrent/ThreadName.java | 4 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 9 ++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../iotdb/commons/pipe/config/PipeConfig.java | 4 +
14 files changed, 430 insertions(+), 121 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
deleted file mode 100644
index 2198d7608b4..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class PipeCronEventInjector {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeCronEventInjector.class);
-
- private static final long CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
-
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
-
- private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_RUNTIME_CRON_EVENT_INJECTOR.getName());
-
- private Future<?> injectorFuture;
-
- public synchronized void start() {
- if (injectorFuture == null) {
- injectorFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- CRON_EVENT_INJECTOR_EXECUTOR,
- this::inject,
- CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
- CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
- TimeUnit.SECONDS);
- LOGGER.info("Pipe cron event injector is started successfully.");
- }
- }
-
- private synchronized void inject() {
- PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
- }
-
- public synchronized void stop() {
- if (injectorFuture != null) {
- injectorFuture.cancel(false);
- injectorFuture = null;
- LOGGER.info("Pipe cron event injector is stopped successfully.");
- }
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
new file mode 100644
index 00000000000..df9240cabe4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Single thread to execute pipe periodical jobs on dataNode. This is for
limiting the thread num on
+ * the DataNode instance.
+ */
+public class PipePeriodicalJobExecutor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
+
+ private static final ScheduledExecutorService PERIODICAL_JOB_EXECUTOR =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName());
+
+ private static final long CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
+
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+ private long cronEventInjectRoundsInterval;
+
+ private static final long MEMORY_EXPANDER_INTERVAL_SECONDS =
+ PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds();
+ private long memoryExpandRoundsInterval;
+
+ // Currently we use the CRON_EVENT_INJECTOR_INTERVAL_SECONDS as minimum
interval
+ private static final long EXECUTOR_INTERVAL_SECONDS =
CRON_EVENT_INJECTOR_INTERVAL_SECONDS;
+ private long rounds;
+
+ private Future<?> executorFuture;
+
+ public synchronized void start() {
+ if (executorFuture == null) {
+ rounds = 0;
+ cronEventInjectRoundsInterval =
+ Math.max(CRON_EVENT_INJECTOR_INTERVAL_SECONDS /
EXECUTOR_INTERVAL_SECONDS, 1);
+ memoryExpandRoundsInterval =
+ Math.max(MEMORY_EXPANDER_INTERVAL_SECONDS /
EXECUTOR_INTERVAL_SECONDS, 1);
+
+ executorFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ PERIODICAL_JOB_EXECUTOR,
+ this::execute,
+ EXECUTOR_INTERVAL_SECONDS,
+ EXECUTOR_INTERVAL_SECONDS,
+ TimeUnit.SECONDS);
+ LOGGER.info("Pipe periodical job executor is started successfully.");
+ }
+ }
+
+ private synchronized void execute() {
+ ++rounds;
+
+ if (rounds % cronEventInjectRoundsInterval == 0) {
+ PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
+ }
+
+ if (rounds % memoryExpandRoundsInterval == 0) {
+ PipeResourceManager.memory().tryExpandAll();
+ }
+ }
+
+ public synchronized void stop() {
+ if (executorFuture != null) {
+ executorFuture.cancel(false);
+ executorFuture = null;
+ LOGGER.info("Pipe periodical job executor is stopped successfully.");
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index f7a2a5595f7..e9c6a50920f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -46,7 +46,8 @@ public class PipeRuntimeAgent implements IService {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
- private final PipeCronEventInjector pipeCronEventInjector = new
PipeCronEventInjector();
+ private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
+ new PipePeriodicalJobExecutor();
private final SimpleConsensusProgressIndexAssigner
simpleConsensusProgressIndexAssigner =
new SimpleConsensusProgressIndexAssigner();
@@ -69,7 +70,7 @@ public class PipeRuntimeAgent implements IService {
public synchronized void start() throws StartupException {
PipeConfig.getInstance().printAllConfigs();
PipeAgentLauncher.launchPipeTaskAgent();
- pipeCronEventInjector.start();
+ pipePeriodicalJobExecutor.start();
isShutdown.set(false);
}
@@ -81,7 +82,7 @@ public class PipeRuntimeAgent implements IService {
}
isShutdown.set(true);
- pipeCronEventInjector.stop();
+ pipePeriodicalJobExecutor.stop();
PipeAgent.task().dropAllPipeTasks();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
index 9de0dc84678..d08b9d50233 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -65,7 +65,7 @@ public class IoTDBThriftAsyncPipeTransferBatchReqBuilder
extends PipeTransferBat
bufferSize += req.getBody().length;
}
- return bufferSize >= maxBatchSizeInBytes
+ return bufferSize >= getMaxBatchSizeInBytes()
|| System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index 7347af503a7..bd6244ed600 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -59,7 +59,7 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder
extends PipeTransferBatc
bufferSize += req.getBody().length;
}
- return bufferSize >= maxBatchSizeInBytes
+ return bufferSize >= getMaxBatchSizeInBytes()
|| System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index e301645e865..ff0e066dc8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -60,7 +60,6 @@ public abstract class PipeTransferBatchReqBuilder implements
AutoCloseable {
// limit in buffer size
protected final PipeMemoryBlock allocatedMemoryBlock;
- protected final long maxBatchSizeInBytes;
protected long bufferSize = 0;
protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
@@ -74,13 +73,28 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
- allocatedMemoryBlock =
PipeResourceManager.memory().tryAllocate(requestMaxBatchSizeInBytes);
- maxBatchSizeInBytes = allocatedMemoryBlock.getMemoryUsageInBytes();
- if (maxBatchSizeInBytes != requestMaxBatchSizeInBytes) {
+
+ allocatedMemoryBlock =
+ PipeResourceManager.memory()
+ .tryAllocate(requestMaxBatchSizeInBytes)
+ .setShrinkMethod((oldMemory) -> Math.max(oldMemory / 2, 0))
+ .setShrinkCallback(
+ (oldMemory, newMemory) ->
+ LOGGER.info(
+ "The batch size limit has shrunk from {} to {}.",
oldMemory, newMemory))
+ .setExpandMethod(
+ (oldMemory) -> Math.min(Math.max(oldMemory, 1) * 2,
requestMaxBatchSizeInBytes))
+ .setExpandCallback(
+ (oldMemory, newMemory) ->
+ LOGGER.info(
+ "The batch size limit has expanded from {} to {}.",
oldMemory, newMemory));
+
+ if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
LOGGER.info(
- "PipeTransferBatchReqBuilder: the max batch size is adjusted from {}
to {}.",
+ "PipeTransferBatchReqBuilder: the max batch size is adjusted from {}
to {} due to the "
+ + "memory restriction",
requestMaxBatchSizeInBytes,
- maxBatchSizeInBytes);
+ getMaxBatchSizeInBytes());
}
}
@@ -88,6 +102,10 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
return reqs;
}
+ protected long getMaxBatchSizeInBytes() {
+ return allocatedMemoryBlock.getMemoryUsageInBytes();
+ }
+
public boolean isEmpty() {
return reqs.isEmpty();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 39a23e0a9d5..7c8b19c0ea5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -56,7 +56,7 @@ public class DisruptorQueue {
allocatedMemoryBlock =
PipeResourceManager.memory()
.tryAllocate(
- ringBufferSize * ringBufferEntrySizeInBytes, (currentSize) ->
currentSize / 2);
+ ringBufferSize * ringBufferEntrySizeInBytes, currentSize ->
currentSize / 2);
disruptor =
new Disruptor<>(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 331eb32bd3c..7523965aa91 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -21,18 +21,133 @@ package org.apache.iotdb.db.pipe.resource.memory;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
public class PipeMemoryBlock implements AutoCloseable {
- private final long memoryUsageInBytes;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeMemoryBlock.class);
+
+ private final PipeMemoryManager pipeMemoryManager =
PipeResourceManager.memory();
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private final AtomicLong memoryUsageInBytes = new AtomicLong(0);
+
+ private final AtomicReference<Function<Long, Long>> shrinkMethod = new
AtomicReference<>();
+ private final AtomicReference<BiConsumer<Long, Long>> shrinkCallback = new
AtomicReference<>();
+ private final AtomicReference<Function<Long, Long>> expandMethod = new
AtomicReference<>();
+ private final AtomicReference<BiConsumer<Long, Long>> expandCallback = new
AtomicReference<>();
private volatile boolean isReleased = false;
public PipeMemoryBlock(long memoryUsageInBytes) {
- this.memoryUsageInBytes = memoryUsageInBytes;
+ this.memoryUsageInBytes.set(memoryUsageInBytes);
}
public long getMemoryUsageInBytes() {
- return memoryUsageInBytes;
+ return memoryUsageInBytes.get();
+ }
+
+ public void setMemoryUsageInBytes(long memoryUsageInBytes) {
+ this.memoryUsageInBytes.set(memoryUsageInBytes);
+ }
+
+ public PipeMemoryBlock setShrinkMethod(Function<Long, Long> shrinkMethod) {
+ this.shrinkMethod.set(shrinkMethod);
+ return this;
+ }
+
+ public PipeMemoryBlock setShrinkCallback(BiConsumer<Long, Long>
shrinkCallback) {
+ this.shrinkCallback.set(shrinkCallback);
+ return this;
+ }
+
+ public PipeMemoryBlock setExpandMethod(Function<Long, Long> extendMethod) {
+ this.expandMethod.set(extendMethod);
+ return this;
+ }
+
+ public PipeMemoryBlock setExpandCallback(BiConsumer<Long, Long>
expandCallback) {
+ this.expandCallback.set(expandCallback);
+ return this;
+ }
+
+ boolean shrink() {
+ if (lock.tryLock()) {
+ try {
+ return doShrink();
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ private boolean doShrink() {
+ if (shrinkMethod.get() == null) {
+ return false;
+ }
+
+ final long oldMemorySizeInBytes = memoryUsageInBytes.get();
+ final long newMemorySizeInBytes =
shrinkMethod.get().apply(memoryUsageInBytes.get());
+
+ final long memoryInBytesCanBeReleased = oldMemorySizeInBytes -
newMemorySizeInBytes;
+ if (memoryInBytesCanBeReleased <= 0
+ || !pipeMemoryManager.release(this, memoryInBytesCanBeReleased)) {
+ return false;
+ }
+
+ if (shrinkCallback.get() != null) {
+ try {
+ shrinkCallback.get().accept(oldMemorySizeInBytes,
newMemorySizeInBytes);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to execute the shrink callback.", e);
+ }
+ }
+ return true;
+ }
+
+ boolean expand() {
+ if (lock.tryLock()) {
+ try {
+ return doExpand();
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ private boolean doExpand() {
+ if (expandMethod.get() == null) {
+ return false;
+ }
+
+ final long oldMemorySizeInBytes = memoryUsageInBytes.get();
+ final long newMemorySizeInBytes =
expandMethod.get().apply(memoryUsageInBytes.get());
+
+ final long memoryInBytesNeededToBeAllocated = newMemorySizeInBytes -
oldMemorySizeInBytes;
+ if (memoryInBytesNeededToBeAllocated <= 0
+ || !pipeMemoryManager.tryAllocate(this,
memoryInBytesNeededToBeAllocated)) {
+ return false;
+ }
+
+ if (expandCallback.get() != null) {
+ try {
+ expandCallback.get().accept(oldMemorySizeInBytes,
newMemorySizeInBytes);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to execute the expand callback.", e);
+ }
+ }
+ return true;
}
boolean isReleased() {
@@ -45,6 +160,20 @@ public class PipeMemoryBlock implements AutoCloseable {
@Override
public void close() {
- PipeResourceManager.memory().release(this);
+ while (true) {
+ try {
+ if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
+ try {
+ pipeMemoryManager.release(this);
+ return;
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while waiting for the lock.", e);
+ }
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 644ee13e4e2..dd17f4bbad3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -30,8 +30,12 @@ import
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
-import java.util.function.Function;
+import java.util.Set;
+import java.util.function.LongUnaryOperator;
public class PipeMemoryManager {
@@ -50,7 +54,9 @@ public class PipeMemoryManager {
private static final long MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES =
PipeConfig.getInstance().getPipeMemoryAllocateMinSizeInBytes();
- private long usedMemorySizeInBytes = 0;
+ private long usedMemorySizeInBytes;
+
+ private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
@@ -60,11 +66,11 @@ public class PipeMemoryManager {
for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
- usedMemorySizeInBytes += sizeInBytes;
- return new PipeMemoryBlock(sizeInBytes);
+ return registeredMemoryBlock(sizeInBytes);
}
try {
+ tryShrink4Allocate(sizeInBytes);
this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -146,18 +152,17 @@ public class PipeMemoryManager {
}
public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
- return tryAllocate(sizeInBytes, (currentSize) -> currentSize * 2 / 3);
+ return tryAllocate(sizeInBytes, currentSize -> currentSize * 2 / 3);
}
public synchronized PipeMemoryBlock tryAllocate(
- long sizeInBytes, Function<Long, Long> customAllocateStrategy) {
+ long sizeInBytes, LongUnaryOperator customAllocateStrategy) {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
return new PipeMemoryBlock(sizeInBytes);
}
if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
- usedMemorySizeInBytes += sizeInBytes;
- return new PipeMemoryBlock(sizeInBytes);
+ return registeredMemoryBlock(sizeInBytes);
}
long sizeToAllocateInBytes = sizeInBytes;
@@ -172,25 +177,83 @@ public class PipeMemoryManager {
usedMemorySizeInBytes,
sizeInBytes,
sizeToAllocateInBytes);
- usedMemorySizeInBytes += sizeToAllocateInBytes;
- return new PipeMemoryBlock(sizeToAllocateInBytes);
+ return registeredMemoryBlock(sizeToAllocateInBytes);
}
sizeToAllocateInBytes =
Math.max(
- customAllocateStrategy.apply(sizeToAllocateInBytes),
+ customAllocateStrategy.applyAsLong(sizeToAllocateInBytes),
MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
}
- LOGGER.warn(
- "tryAllocate: failed to allocate memory, "
- + "total memory size {} bytes, used memory size {} bytes, "
- + "requested memory size {} bytes",
- TOTAL_MEMORY_SIZE_IN_BYTES,
- usedMemorySizeInBytes,
- sizeInBytes);
+ if (tryShrink4Allocate(sizeToAllocateInBytes)) {
+ LOGGER.info(
+ "tryAllocate: allocated memory, "
+ + "total memory size {} bytes, used memory size {} bytes, "
+ + "original requested memory size {} bytes,"
+ + "actual requested memory size {} bytes",
+ TOTAL_MEMORY_SIZE_IN_BYTES,
+ usedMemorySizeInBytes,
+ sizeInBytes,
+ sizeToAllocateInBytes);
+ return registeredMemoryBlock(sizeToAllocateInBytes);
+ } else {
+ LOGGER.warn(
+ "tryAllocate: failed to allocate memory, "
+ + "total memory size {} bytes, used memory size {} bytes, "
+ + "requested memory size {} bytes",
+ TOTAL_MEMORY_SIZE_IN_BYTES,
+ usedMemorySizeInBytes,
+ sizeInBytes);
+ return registeredMemoryBlock(0);
+ }
+ }
+
+ public synchronized boolean tryAllocate(
+ PipeMemoryBlock block, long memoryInBytesNeededToBeAllocated) {
+ if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null ||
block.isReleased()) {
+ return false;
+ }
- return new PipeMemoryBlock(0);
+ if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >=
memoryInBytesNeededToBeAllocated) {
+ usedMemorySizeInBytes += memoryInBytesNeededToBeAllocated;
+ block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() +
memoryInBytesNeededToBeAllocated);
+ return true;
+ }
+
+ return false;
+ }
+
+ private PipeMemoryBlock registeredMemoryBlock(long sizeInBytes) {
+ usedMemorySizeInBytes += sizeInBytes;
+
+ final PipeMemoryBlock returnedMemoryBlock = new
PipeMemoryBlock(sizeInBytes);
+ allocatedBlocks.add(returnedMemoryBlock);
+ return returnedMemoryBlock;
+ }
+
+ private boolean tryShrink4Allocate(long sizeInBytes) {
+ final List<PipeMemoryBlock> shuffledBlocks = new
ArrayList<>(allocatedBlocks);
+ Collections.shuffle(shuffledBlocks);
+
+ while (true) {
+ boolean hasAtLeastOneBlockShrinkable = false;
+ for (final PipeMemoryBlock block : shuffledBlocks) {
+ if (block.shrink()) {
+ hasAtLeastOneBlockShrinkable = true;
+ if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >=
sizeInBytes) {
+ return true;
+ }
+ }
+ }
+ if (!hasAtLeastOneBlockShrinkable) {
+ return false;
+ }
+ }
+ }
+
+ public synchronized void tryExpandAll() {
+ allocatedBlocks.forEach(PipeMemoryBlock::expand);
}
public synchronized void release(PipeMemoryBlock block) {
@@ -198,12 +261,26 @@ public class PipeMemoryManager {
return;
}
+ allocatedBlocks.remove(block);
usedMemorySizeInBytes -= block.getMemoryUsageInBytes();
block.markAsReleased();
this.notifyAll();
}
+ public synchronized boolean release(PipeMemoryBlock block, long sizeInBytes)
{
+ if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null ||
block.isReleased()) {
+ return false;
+ }
+
+ usedMemorySizeInBytes -= sizeInBytes;
+ block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() - sizeInBytes);
+
+ this.notifyAll();
+
+ return true;
+ }
+
public long getUsedMemorySizeInBytes() {
return usedMemorySizeInBytes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index ecb4bb3483a..b65bd1bb4e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -36,6 +36,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -50,6 +51,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
/** This cache is used by {@link WALEntryPosition}. */
public class WALInsertNodeCache {
@@ -59,7 +61,9 @@ public class WALInsertNodeCache {
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final PipeMemoryBlock allocatedMemoryBlock;
- private boolean isBatchLoadEnabled;
+ // Used to adjust the memory usage of the cache
+ private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
+ private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>>
lruCache;
// ids of all pinned memTables
@@ -68,21 +72,55 @@ public class WALInsertNodeCache {
private volatile boolean hasPipeRunning = false;
private WALInsertNodeCache(Integer dataRegionId) {
+ final long requestedAllocateSize =
+ (long)
+ Math.min(
+ 2 * CONFIG.getWalFileSizeThresholdInByte(),
+ CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
allocatedMemoryBlock =
PipeResourceManager.memory()
- .tryAllocate(
- (long)
- Math.min(
- 2 * CONFIG.getWalFileSizeThresholdInByte(),
- CONFIG.getAllocateMemoryForPipe() * 0.8 / 5));
- isBatchLoadEnabled =
- allocatedMemoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte();
+ .tryAllocate(requestedAllocateSize)
+ .setShrinkMethod((oldMemory) -> Math.max(oldMemory / 2, 1))
+ .setShrinkCallback(
+ (oldMemory, newMemory) -> {
+ memoryUsageCheatFactor.set(
+ memoryUsageCheatFactor.get() * ((double) oldMemory /
newMemory));
+ isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
+ LOGGER.info(
+ "WALInsertNodeCache.allocatedMemoryBlock of dataRegion
{} has shrunk from {} to {}.",
+ dataRegionId,
+ oldMemory,
+ newMemory);
+ })
+ .setExpandMethod(
+ (oldMemory) -> Math.min(Math.max(oldMemory, 1) * 2,
requestedAllocateSize))
+ .setExpandCallback(
+ (oldMemory, newMemory) -> {
+ memoryUsageCheatFactor.set(
+ memoryUsageCheatFactor.get() / ((double) newMemory /
oldMemory));
+ isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
+ LOGGER.info(
+ "WALInsertNodeCache allocatedMemoryBlock of dataRegion
{} has expanded from {} to {}.",
+ dataRegionId,
+ oldMemory,
+ newMemory);
+ });
+ isBatchLoadEnabled.set(
+ allocatedMemoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte());
lruCache =
Caffeine.newBuilder()
.maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
.weigher(
(Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
- (position, pair) -> position.getSize())
+ (position, pair) -> {
+ final long weightInLong =
+ (long) (position.getSize() *
memoryUsageCheatFactor.get());
+ if (weightInLong <= 0) {
+ return Integer.MAX_VALUE;
+ }
+ final int weightInInt = (int) weightInLong;
+ return weightInInt != weightInLong ? Integer.MAX_VALUE :
weightInInt;
+ })
.recordStats()
.build(new WALInsertNodeCacheLoader());
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
@@ -153,7 +191,7 @@ public class WALInsertNodeCache {
hasPipeRunning = true;
final Pair<ByteBuffer, InsertNode> pair =
- isBatchLoadEnabled
+ isBatchLoadEnabled.get()
? lruCache.getAll(Collections.singleton(position)).get(position)
: lruCache.get(position);
@@ -282,12 +320,12 @@ public class WALInsertNodeCache {
@TestOnly
public boolean isBatchLoadEnabled() {
- return isBatchLoadEnabled;
+ return isBatchLoadEnabled.get();
}
@TestOnly
public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
- this.isBatchLoadEnabled = isBatchLoadEnabled;
+ this.isBatchLoadEnabled.set(isBatchLoadEnabled);
}
@TestOnly
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 7667b3af73a..e2340b02046 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -131,7 +131,7 @@ public enum ThreadName {
PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
- PIPE_RUNTIME_CRON_EVENT_INJECTOR("Pipe-Runtime-Cron-Event-Injector"),
+ PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
@@ -267,7 +267,7 @@ public enum ThreadName {
PIPE_RUNTIME_META_SYNCER,
PIPE_RUNTIME_HEARTBEAT,
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
- PIPE_RUNTIME_CRON_EVENT_INJECTOR,
+ PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_WAL_RESOURCE_TTL_CHECKER,
PIPE_RECEIVER_AIR_GAP_AGENT,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e2043142971..5ca93c3bd07 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -192,6 +192,7 @@ public class CommonConfig {
private int pipeMemoryAllocateMaxRetries = 10;
private long pipeMemoryAllocateMinSizeInBytes = 32;
private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 *
1024; // 2MB
+ private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min
/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
@@ -779,6 +780,14 @@ public class CommonConfig {
pipeMemoryAllocateForTsFileSequenceReaderInBytes;
}
+ public long getPipeMemoryExpanderIntervalSeconds() {
+ return pipeMemoryExpanderIntervalSeconds;
+ }
+
+ public void setPipeMemoryExpanderIntervalSeconds(long
pipeMemoryExpanderIntervalSeconds) {
+ this.pipeMemoryExpanderIntervalSeconds = pipeMemoryExpanderIntervalSeconds;
+ }
+
public int getPipeMemoryAllocateMaxRetries() {
return pipeMemoryAllocateMaxRetries;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 6893e9b85ba..a7ae02c1bea 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -428,6 +428,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_memory_allocate_for_tsfile_sequence_reader_in_bytes",
String.valueOf(config.getPipeMemoryAllocateForTsFileSequenceReaderInBytes()))));
+ config.setPipeMemoryExpanderIntervalSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_memory_expander_interval_seconds",
+
String.valueOf(config.getPipeMemoryExpanderIntervalSeconds()))));
}
public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4718f4df1e7..c70def090c5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -191,6 +191,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMemoryAllocateForTsFileSequenceReaderInBytes();
}
+ public long getPipeMemoryExpanderIntervalSeconds() {
+ return COMMON_CONFIG.getPipeMemoryExpanderIntervalSeconds();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);