This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a1bf49d5d0 [ISSUE #7093] Avoid dispatch tasks too much cause dispatch
task failed (#7094)
a1bf49d5d0 is described below
commit a1bf49d5d07cf64374bc3dde5ab43add831433ad
Author: lizhimins <[email protected]>
AuthorDate: Tue Aug 1 15:56:34 2023 +0800
[ISSUE #7093] Avoid dispatch tasks too much cause dispatch task failed
(#7094)
* Avoid dispatch tasks too much cause dispatch task failed
* set schedule task async
---
.../org/apache/rocketmq/tieredstore/TieredDispatcher.java | 11 ++++++-----
.../rocketmq/tieredstore/common/TieredStoreExecutor.java | 2 +-
2 files changed, 7 insertions(+), 6 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 523b0c2cde..bb58ea7dd5 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -82,7 +82,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(()
->
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile ->
{
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
- dispatchFlatFile(flatFile);
+ dispatchFlatFileAsync(flatFile);
}
}), 30, 10, TimeUnit.SECONDS);
}
@@ -180,10 +180,6 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
message.release();
flatFile.getCompositeFlatFileLock().unlock();
}
- } else {
- if (!flatFile.getCompositeFlatFileLock().isLocked()) {
- this.dispatchFlatFileAsync(flatFile);
- }
}
}
@@ -199,6 +195,11 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile,
Consumer<Long> consumer) {
+ // Avoid dispatch tasks too much
+ if (TieredStoreExecutor.dispatchThreadPoolQueue.size() >
+ TieredStoreExecutor.QUEUE_CAPACITY * 0.75) {
+ return;
+ }
TieredStoreExecutor.dispatchExecutor.execute(() -> {
try {
dispatchFlatFile(flatFile);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 23f1b01eac..6eb3478b3d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
public class TieredStoreExecutor {
- private static final int QUEUE_CAPACITY = 10000;
+ public static final int QUEUE_CAPACITY = 10000;
// Visible for monitor
public static BlockingQueue<Runnable> dispatchThreadPoolQueue;