This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b9bbd5206 [ISSUE #6230] Optimizes ScheduleMessageService code and
logic (#6231)
b9bbd5206 is described below
commit b9bbd520603814de6bcfb76a811fc6dac4afc21b
Author: Lobo Xu <[email protected]>
AuthorDate: Mon Mar 6 10:22:29 2023 +0800
[ISSUE #6230] Optimizes ScheduleMessageService code and logic (#6231)
Co-authored-by: loboxu <[email protected]>
---
.../broker/schedule/ScheduleMessageService.java | 72 +++++++---------------
1 file changed, 22 insertions(+), 50 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index bdc9c5672..e91c32b55 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -17,10 +17,7 @@
package org.apache.rocketmq.broker.schedule;
import io.opentelemetry.api.common.Attributes;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -32,8 +29,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.common.ConfigManager;
@@ -96,14 +93,11 @@ public class ScheduleMessageService extends ConfigManager {
this.enableAsyncDeliver =
brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
scheduledPersistService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true,
brokerController.getBrokerConfig()));
- scheduledPersistService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- ScheduleMessageService.this.persist();
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate flush exception", e);
- }
+ scheduledPersistService.scheduleAtFixedRate(() -> {
+ try {
+ ScheduleMessageService.this.persist();
+ } catch (Throwable e) {
+ log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000,
this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(),
TimeUnit.MILLISECONDS);
}
@@ -117,9 +111,7 @@ public class ScheduleMessageService extends ConfigManager {
}
public void buildRunningStats(HashMap<String, String> stats) {
- Iterator<Map.Entry<Integer, Long>> it =
this.offsetTable.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Integer, Long> next = it.next();
+ for (Map.Entry<Integer, Long> next : this.offsetTable.entrySet()) {
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
long maxOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
queueId);
@@ -169,17 +161,13 @@ public class ScheduleMessageService extends ConfigManager
{
}
}
- this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- if (started.get()) {
- ScheduleMessageService.this.persist();
- }
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate flush exception", e);
+ this.deliverExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ if (started.get()) {
+ ScheduleMessageService.this.persist();
}
+ } catch (Throwable e) {
+ log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000,
this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(),
TimeUnit.MILLISECONDS);
}
@@ -208,10 +196,8 @@ public class ScheduleMessageService extends ConfigManager {
}
}
- if (this.deliverPendingTable != null) {
- for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
- log.warn("deliverPendingTable level: {}, size: {}", i,
this.deliverPendingTable.get(i).size());
- }
+ for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
+ log.warn("deliverPendingTable level: {}, size: {}", i,
this.deliverPendingTable.get(i).size());
}
this.persist();
@@ -378,17 +364,6 @@ public class ScheduleMessageService extends ConfigManager {
return msgInner;
}
- public int computeDelayLevel(long timeMillis) {
- long intervalMillis = timeMillis - System.currentTimeMillis();
- List<Map.Entry<Integer, Long>> sortedLevels =
delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList());
- for (Map.Entry<Integer, Long> entry : sortedLevels) {
- if (entry.getValue() > intervalMillis) {
- return entry.getKey();
- }
- }
- return sortedLevels.get(sortedLevels.size() - 1).getKey();
- }
-
class DeliverDelayedMessageTimerTask implements Runnable {
private final int delayLevel;
private final long offset;
@@ -402,7 +377,7 @@ public class ScheduleMessageService extends ConfigManager {
public void run() {
try {
if (isStarted()) {
- this.executeOnTimeup();
+ this.executeOnTimeUp();
}
} catch (Exception e) {
// XXX: warn and notify me
@@ -411,9 +386,6 @@ public class ScheduleMessageService extends ConfigManager {
}
}
- /**
- * @return
- */
private long correctDeliverTimestamp(final long now, final long
deliverTimestamp) {
long result = deliverTimestamp;
@@ -426,7 +398,7 @@ public class ScheduleMessageService extends ConfigManager {
return result;
}
- public void executeOnTimeup() {
+ public void executeOnTimeUp() {
ConsumeQueueInterface cq =
ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
@@ -633,7 +605,7 @@ public class ScheduleMessageService extends ConfigManager {
private boolean autoResend = false;
private CompletableFuture<PutMessageResult> future;
- private volatile int resendCount = 0;
+ private volatile AtomicInteger resendCount = new AtomicInteger(0);
private volatile ProcessStatus status = ProcessStatus.RUNNING;
public PutResultProcess setTopic(String topic) {
@@ -712,7 +684,7 @@ public class ScheduleMessageService extends ConfigManager {
return future;
}
- public int getResendCount() {
+ public AtomicInteger getResendCount() {
return resendCount;
}
@@ -795,7 +767,7 @@ public class ScheduleMessageService extends ConfigManager {
// Gradually increase the resend interval.
try {
- Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+ Thread.sleep(Math.min(this.resendCount.incrementAndGet() *
100, 60 * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -823,13 +795,13 @@ public class ScheduleMessageService extends ConfigManager
{
public boolean need2Blocked() {
int maxResendNum2Blocked =
ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig()
.getScheduleAsyncDeliverMaxResendNum2Blocked();
- return this.resendCount > maxResendNum2Blocked;
+ return this.resendCount.get() > maxResendNum2Blocked;
}
public boolean need2Skip() {
int maxResendNum2Blocked =
ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig()
.getScheduleAsyncDeliverMaxResendNum2Blocked();
- return this.resendCount > maxResendNum2Blocked * 2;
+ return this.resendCount.get() > maxResendNum2Blocked * 2;
}
@Override