Git-Yang commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r779983206
##########
File path:
store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -308,158 +387,408 @@ public void executeOnTimeup() {
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
- long failScheduleOffset = offset;
+ if (cq == null) {
+ this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+ return;
+ }
- if (cq != null) {
- SelectMappedBufferResult bufferCQ =
cq.getIndexBuffer(this.offset);
- if (bufferCQ != null) {
- try {
- long nextOffset = offset;
- int i = 0;
- ConsumeQueueExt.CqExtUnit cqExtUnit = new
ConsumeQueueExt.CqExtUnit();
- for (; i < bufferCQ.getSize(); i +=
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
- long offsetPy = bufferCQ.getByteBuffer().getLong();
- int sizePy = bufferCQ.getByteBuffer().getInt();
- long tagsCode = bufferCQ.getByteBuffer().getLong();
-
- if (cq.isExtAddr(tagsCode)) {
- if (cq.getExt(tagsCode, cqExtUnit)) {
- tagsCode = cqExtUnit.getTagsCode();
- } else {
- //can't find ext content.So re compute
tags code.
- log.error("[BUG] can't find consume queue
extend file content!addr={}, offsetPy={}, sizePy={}",
- tagsCode, offsetPy, sizePy);
- long msgStoreTime =
defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
- tagsCode =
computeDeliverTimestamp(delayLevel, msgStoreTime);
- }
- }
+ SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+ if (bufferCQ == null) {
+ long resetOffset;
+ if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+ log.error("schedule CQ offset invalid. offset={},
cqMinOffset={}, queueId={}",
+ this.offset, resetOffset, cq.getQueueId());
+ } else if ((resetOffset = cq.getMaxOffsetInQueue()) <
this.offset) {
+ log.error("schedule CQ offset invalid. offset={},
cqMaxOffset={}, queueId={}",
+ this.offset, resetOffset, cq.getQueueId());
+ } else {
+ resetOffset = this.offset;
+ }
- long now = System.currentTimeMillis();
- long deliverTimestamp =
this.correctDeliverTimestamp(now, tagsCode);
-
- nextOffset = offset + (i /
ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
- long countdown = deliverTimestamp - now;
-
- if (countdown <= 0) {
- MessageExt msgExt =
-
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
- offsetPy, sizePy);
-
- if (msgExt != null) {
- try {
- MessageExtBrokerInner msgInner =
this.messageTimeup(msgExt);
- if
(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
- log.error("[BUG] the real topic of
schedule msg is {}, discard the msg. msg={}",
- msgInner.getTopic(), msgInner);
- continue;
- }
- PutMessageResult putMessageResult =
-
ScheduleMessageService.this.writeMessageStore
- .putMessage(msgInner);
-
- if (putMessageResult != null
- &&
putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
- if
(ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats())
{
-
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1,
putMessageResult.getAppendMessageResult().getMsgNum());
-
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1,
putMessageResult.getAppendMessageResult().getWroteBytes());
-
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
putMessageResult.getAppendMessageResult().getMsgNum());
-
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
putMessageResult.getAppendMessageResult().getWroteBytes());
-
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-
putMessageResult.getAppendMessageResult().getWroteBytes());
-
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
- }
- continue;
- } else {
- // XXX: warn and notify me
- log.error(
- "ScheduleMessageService, a
message time up, but reput it failed, topic: {} msgId {}",
- msgExt.getTopic(),
msgExt.getMsgId());
-
ScheduleMessageService.this.deliverExecutorService.schedule(
- new
DeliverDelayedMessageTimerTask(this.delayLevel,
- nextOffset),
DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-
ScheduleMessageService.this.updateOffset(this.delayLevel,
- nextOffset);
- return;
- }
- } catch (Exception e) {
- /*
- * XXX: warn and notify me
- */
- log.error(
- "ScheduleMessageService,
messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={},
sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
- }
- }
- } else {
-
ScheduleMessageService.this.deliverExecutorService.schedule(
- new
DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
- countdown, TimeUnit.MILLISECONDS);
-
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- }
- } // end of for
+ this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+ return;
+ }
+
+ long nextOffset = this.offset;
+ try {
+ int i = 0;
+ ConsumeQueueExt.CqExtUnit cqExtUnit = new
ConsumeQueueExt.CqExtUnit();
+ for (; i < bufferCQ.getSize() && isStarted(); i +=
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+ long offsetPy = bufferCQ.getByteBuffer().getLong();
+ int sizePy = bufferCQ.getByteBuffer().getInt();
+ long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+ if (cq.isExtAddr(tagsCode)) {
+ if (cq.getExt(tagsCode, cqExtUnit)) {
+ tagsCode = cqExtUnit.getTagsCode();
+ } else {
+ //can't find ext content.So re compute tags code.
+ log.error("[BUG] can't find consume queue extend
file content!addr={}, offsetPy={}, sizePy={}",
+ tagsCode, offsetPy, sizePy);
+ long msgStoreTime =
defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+ tagsCode = computeDeliverTimestamp(delayLevel,
msgStoreTime);
+ }
+ }
- nextOffset = offset + (i /
ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
ScheduleMessageService.this.deliverExecutorService.schedule(new
DeliverDelayedMessageTimerTask(
- this.delayLevel, nextOffset), DELAY_FOR_A_WHILE,
TimeUnit.MILLISECONDS);
-
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+ long now = System.currentTimeMillis();
+ long deliverTimestamp = this.correctDeliverTimestamp(now,
tagsCode);
+ nextOffset = offset + (i /
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+ long countdown = deliverTimestamp - now;
+ if (countdown > 0) {
+ this.scheduleNextTimerTask(nextOffset,
DELAY_FOR_A_WHILE);
return;
- } finally {
+ }
+
+ MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy,
sizePy);
+ if (msgExt == null) {
+ continue;
+ }
- bufferCQ.release();
+ MessageExtBrokerInner msgInner =
ScheduleMessageService.this.messageTimeup(msgExt);
+ if
(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+ log.error("[BUG] the real topic of schedule msg is {},
discard the msg. msg={}",
+ msgInner.getTopic(), msgInner);
+ continue;
}
- } // end of if (bufferCQ != null)
- else {
-
- long cqMinOffset = cq.getMinOffsetInQueue();
- long cqMaxOffset = cq.getMaxOffsetInQueue();
- if (offset < cqMinOffset) {
- failScheduleOffset = cqMinOffset;
- log.error("schedule CQ offset invalid. offset={},
cqMinOffset={}, cqMaxOffset={}, queueId={}",
- offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+ boolean deliverSuc;
+ if (ScheduleMessageService.this.enableAsyncDeliver) {
+ deliverSuc = this.asyncDeliver(msgInner,
msgExt.getMsgId(), offset, offsetPy, sizePy);
+ } else {
+ deliverSuc = this.syncDeliver(msgInner,
msgExt.getMsgId(), offset, offsetPy, sizePy);
}
- if (offset > cqMaxOffset) {
- failScheduleOffset = cqMaxOffset;
- log.error("schedule CQ offset invalid. offset={},
cqMinOffset={}, cqMaxOffset={}, queueId={}",
- offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+ if (!deliverSuc) {
+ this.scheduleNextTimerTask(nextOffset,
DELAY_FOR_A_WHILE);
+ return;
}
}
- } // end of if (cq != null)
- ScheduleMessageService.this.deliverExecutorService.schedule(new
DeliverDelayedMessageTimerTask(this.delayLevel,
- failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+ nextOffset = this.offset + (i /
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ } catch (Exception e) {
+ log.error("ScheduleMessageService, messageTimeup execute
error, offset = {}", nextOffset, e);
+ } finally {
+ bufferCQ.release();
+ }
+
+ this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+ }
+
+ public void scheduleNextTimerTask(long offset, long delay) {
+ ScheduleMessageService.this.deliverExecutorService.schedule(new
DeliverDelayedMessageTimerTask(
+ this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+ }
+
+ private boolean syncDeliver(MessageExtBrokerInner msgInner, String
msgId, long offset, long offsetPy,
+ int sizePy) {
+ PutResultProcess resultProcess = deliverMessage(msgInner, msgId,
offset, offsetPy, sizePy, false);
+ PutMessageResult result = resultProcess.get();
+ boolean sendStatus = result != null &&
result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+ if (sendStatus) {
+ ScheduleMessageService.this.updateOffset(this.delayLevel,
resultProcess.getNextOffset());
+ }
+ return sendStatus;
+ }
+
+ private boolean asyncDeliver(MessageExtBrokerInner msgInner, String
msgId, long offset, long offsetPy,
+ int sizePy) {
+ Queue<PutResultProcess> processesQueue =
ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+ //Flow Control
+ int currentPendingNum = processesQueue.size();
+ if (currentPendingNum >
ScheduleMessageService.this.maxPendingLimit) {
+ log.warn("Asynchronous deliver triggers flow control, " +
+ "currentPendingNum={}, maxPendingLimit={}",
currentPendingNum, maxPendingLimit);
+ return false;
+ }
+
+ //Blocked
+ PutResultProcess firstProcess = processesQueue.peek();
+ if (firstProcess != null && firstProcess.need2Blocked()) {
+ log.warn("Asynchronous deliver block. info={}",
firstProcess.toString());
+ return false;
+ }
+
+ PutResultProcess resultProcess = deliverMessage(msgInner, msgId,
offset, offsetPy, sizePy, true);
+ processesQueue.add(resultProcess);
+ return true;
+ }
+
+ private PutResultProcess deliverMessage(MessageExtBrokerInner
msgInner, String msgId, long offset,
+ long offsetPy, int sizePy, boolean autoResend) {
+ CompletableFuture<PutMessageResult> future =
+
ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+ return new PutResultProcess()
+ .setTopic(msgInner.getTopic())
+ .setDelayLevel(this.delayLevel)
+ .setOffset(offset)
+ .setPhysicOffset(offsetPy)
+ .setPhysicSize(sizePy)
+ .setMsgId(msgId)
+ .setAutoResend(autoResend)
+ .setFuture(future)
+ .thenProcess();
}
+ }
- private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setBody(msgExt.getBody());
- msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+ public class HandlePutResultTask implements Runnable {
+ private final int delayLevel;
- TopicFilterType topicFilterType =
MessageExt.parseTopicFilterType(msgInner.getSysFlag());
- long tagsCodeValue =
- MessageExtBrokerInner.tagsString2tagsCode(topicFilterType,
msgInner.getTags());
- msgInner.setTagsCode(tagsCodeValue);
-
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+ public HandlePutResultTask(int delayLevel) {
+ this.delayLevel = delayLevel;
+ }
- msgInner.setSysFlag(msgExt.getSysFlag());
- msgInner.setBornTimestamp(msgExt.getBornTimestamp());
- msgInner.setBornHost(msgExt.getBornHost());
- msgInner.setStoreHost(msgExt.getStoreHost());
- msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+ @Override
+ public void run() {
+ LinkedBlockingQueue<PutResultProcess> pendingQueue =
+
ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+ PutResultProcess putResultProcess;
+ while ((putResultProcess = pendingQueue.peek()) != null) {
+ try {
+ switch (putResultProcess.getStatus()) {
+ case SUCCESS:
+
ScheduleMessageService.this.updateOffset(this.delayLevel,
putResultProcess.getNextOffset());
+ pendingQueue.remove();
+ break;
+ case RUNNING:
+ break;
+ case EXCEPTION:
+ if (!isStarted()) {
+ log.warn("HandlePutResultTask shutdown,
info={}", putResultProcess.toString());
+ return;
+ }
+ log.warn("putResultProcess error, info={}",
putResultProcess.toString());
+ putResultProcess.onException();
+ break;
+ case SKIP:
+ log.warn("putResultProcess skip, info={}",
putResultProcess.toString());
+ pendingQueue.remove();
+ break;
+ }
+ } catch (Exception e) {
+ log.error("HandlePutResultTask exception. info={}",
putResultProcess.toString(), e);
+ putResultProcess.onException();
+ }
+ }
- msgInner.setWaitStoreMsgOK(false);
- MessageAccessor.clearProperty(msgInner,
MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+ if (isStarted()) {
+ ScheduleMessageService.this.handleExecutorService
+ .schedule(new HandlePutResultTask(this.delayLevel),
DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
-
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+ public class PutResultProcess {
+ private String topic;
+ private long offset;
+ private long physicOffset;
+ private int physicSize;
+ private int delayLevel;
+ private String msgId;
+ private boolean autoResend = false;
+ private CompletableFuture<PutMessageResult> future;
+
+ private volatile int resendCount = 0;
+ private volatile ProcessStatus status = ProcessStatus.RUNNING;
+
+ public PutResultProcess setTopic(String topic) {
+ this.topic = topic;
+ return this;
+ }
- String queueIdStr =
msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
- int queueId = Integer.parseInt(queueIdStr);
- msgInner.setQueueId(queueId);
+ public PutResultProcess setOffset(long offset) {
+ this.offset = offset;
+ return this;
+ }
- return msgInner;
+ public PutResultProcess setPhysicOffset(long physicOffset) {
+ this.physicOffset = physicOffset;
+ return this;
}
+
+ public PutResultProcess setPhysicSize(int physicSize) {
+ this.physicSize = physicSize;
+ return this;
+ }
+
+ public PutResultProcess setDelayLevel(int delayLevel) {
+ this.delayLevel = delayLevel;
+ return this;
+ }
+
+ public PutResultProcess setMsgId(String msgId) {
+ this.msgId = msgId;
+ return this;
+ }
+
+ public PutResultProcess setAutoResend(boolean autoResend) {
+ this.autoResend = autoResend;
+ return this;
+ }
+
+ public PutResultProcess setFuture(CompletableFuture<PutMessageResult>
future) {
+ this.future = future;
+ return this;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getNextOffset() {
+ return offset + 1;
+ }
+
+ public long getPhysicOffset() {
+ return physicOffset;
+ }
+
+ public int getPhysicSize() {
+ return physicSize;
+ }
+
+ public Integer getDelayLevel() {
+ return delayLevel;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public boolean isAutoResend() {
+ return autoResend;
+ }
+
+ public CompletableFuture<PutMessageResult> getFuture() {
+ return future;
+ }
+
+ public int getResendCount() {
+ return resendCount;
+ }
+
+ public PutResultProcess thenProcess() {
+ this.future.thenAccept(result -> {
+ this.handleResult(result);
+ });
+
+ this.future.exceptionally(e -> {
+ log.error("ScheduleMessageService put message exceptionally,
info: {}",
+ PutResultProcess.this.toString(), e);
+
+ onException();
+ return null;
+ });
+ return this;
+ }
+
+ private void handleResult(PutMessageResult result) {
+ if (result != null && result.getPutMessageStatus() ==
PutMessageStatus.PUT_OK) {
+ onSuccess(result);
+ } else {
+ log.warn("ScheduleMessageService put message failed. info:
{}.", result);
+ onException();
+ }
+ }
+
+ public void onSuccess(PutMessageResult result) {
+ this.status = ProcessStatus.SUCCESS;
+ if
(ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats())
{
+
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1,
result.getAppendMessageResult().getMsgNum());
+
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1,
result.getAppendMessageResult().getWroteBytes());
+
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
result.getAppendMessageResult().getMsgNum());
+
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
result.getAppendMessageResult().getWroteBytes());
+
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic,
result.getAppendMessageResult().getMsgNum(), 1);
+
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic,
result.getAppendMessageResult().getWroteBytes());
+
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+ }
+ }
+
+ public void onException() {
+ log.warn("ScheduleMessageService onException, info: {}",
this.toString());
+ if (this.autoResend) {
+ this.resend();
+ } else {
+ this.status = ProcessStatus.SKIP;
+ }
+ }
+
+ public ProcessStatus getStatus() {
+ return this.status;
+ }
+
+ public PutMessageResult get() {
+ try {
+ return this.future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR,
null);
+ }
+ }
+
+ private void resend() {
+ log.info("Resend message, info: {}", this.toString());
+
+ // Gradually increase the resend interval.
+ try {
+ Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset,
this.physicSize);
+ if (msgExt == null) {
+ log.warn("ScheduleMessageService resend not found message.
info: {}", this.toString());
+ this.status = need2Skip() ? ProcessStatus.SKIP :
ProcessStatus.EXCEPTION;
+ return;
+ }
+
+ MessageExtBrokerInner msgInner =
ScheduleMessageService.this.messageTimeup(msgExt);
+ PutMessageResult result =
ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
+ this.handleResult(result);
+ if (result != null && result.getPutMessageStatus() ==
PutMessageStatus.PUT_OK) {
+ log.info("Resend message success, info: {}",
this.toString());
+ }
+ } catch (Exception e) {
+ this.status = ProcessStatus.EXCEPTION;
+ log.error("Resend message error, info: {}", this.toString(),
e);
+ }
+ }
+
+ public boolean need2Blocked() {
+ return this.resendCount >
ScheduleMessageService.this.maxResendNum2Blocked;
+ }
+
+ public boolean need2Skip() {
+ return this.resendCount >
ScheduleMessageService.this.maxResendNum2Blocked * 2;
+ }
+
+ @Override
+ public String toString() {
+ return "PutResultProcess{" +
+ "topic='" + topic + '\'' +
+ ", offset=" + offset +
+ ", physicOffset=" + physicOffset +
+ ", physicSize=" + physicSize +
+ ", delayLevel=" + delayLevel +
+ ", msgId='" + msgId + '\'' +
+ ", autoResend=" + autoResend +
+ ", resendCount=" + resendCount +
+ ", status=" + status +
+ '}';
+ }
+ }
+
+ public enum ProcessStatus {
+ RUNNING,
+ SUCCESS,
+ EXCEPTION,
+ SKIP,
}
Review comment:
Ok, I will add a comment description.
In some extreme cases, in order to avoid blocking, message delivery will be
skipped. For example:
After the message delivery fails, a retry will be performed at this time. If
the message does not exist (the message expires), the delivery will be skipped
to prevent the entire delay level from being blocked and unavailable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]