pandaapo commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1384989412
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ sendMessageBack(event);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ getHandleMessageContext().finish();
+ }
+ } else {
+ doRun(timeout);
+ }
+ }
+
+ protected HandleMessageContext getHandleMessageContext() throws Exception {
+ throw new IllegalAccessException("method not supported.");
+ }
+
+ public abstract void doRun(Timeout timeout) throws Exception;
+
+ @SneakyThrows
+ protected ProducerManager getProducerManager() {
+ throw new IllegalAccessException("method not supported.");
+ }
+
+ @SneakyThrows
+ protected ConsumerGroupConf getConsumerGroupConf() {
+ throw new IllegalAccessException("method not supported.");
+ }
+
+ private void sendMessageBack(final CloudEvent event) throws Exception {
+ String topic = event.getSubject();
+ String bizSeqNo =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
+ String uniqueId =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
+
+ ConsumerGroupConf consumerGroupConf = getConsumerGroupConf();
+ String consumerGroupName = consumerGroupConf.getConsumerGroup();
+ EventMeshProducer sendMessageBack =
getProducerManager().getEventMeshProducer(consumerGroupName);
+
+ if (sendMessageBack == null) {
+ log.warn("consumer:{} consume fail, sendMessageBack, topic:{},
bizSeqNo:{}, uniqueId:{}",
+ consumerGroupName, topic, bizSeqNo, uniqueId);
+ return;
+ }
+ TopicNameHelper topicNameGenerator =
EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
+ commonConfiguration.getEventMeshStoragePluginType());
+ String retryTopicName =
topicNameGenerator.generateRetryTopicName(consumerGroupName);
Review Comment:
Send the message to the MQ retry queue for retry, more accurately RocketMQ.
Because other mainstream MQ does not have a retry queue like RocketMQ, for
example, RabbitMQ's retry queue is a regular queue that is only controlled
through the `x-max-retries` attribute in the message header, and Kafka does not
have a retry queue.
This also vaguely reminds me of the reason why PR #4179 was not merged, as
if considering that this method can only be implemented for RocketMQ and is
difficult to extend to other MQ. Is that the reason? @xwm1992
---
将消息发给MQ的重试队列进行重试,更准确的说是RocketMQ。因为其他主流的MQ并没有像RocketMQ一样的重试队列,比如RabbitMQ的重试队列是普通队列,只是通过消息头的`x-max-retries`属性来控制、Kafka没有重试队列。
这也让我模糊地想起了没有合并PR #4179的原因,好像是考虑这种方式只能针对RocketMQ来实现,难以拓展到其他MQ。是这个原因吗?@xwm1992
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ sendMessageBack(event);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ getHandleMessageContext().finish();
Review Comment:
I didn't understand this `maxTimesPerEvent`. When `maxTimesPerEvent<1`, send
to the retry queue of MQ. When `maxTimesPerEvent ≥ 1`, it is directly processed
as successful consumption. What does this variable record?
---
这个`maxTimesPerEvent`没看明白。`maxTimesPerEvent < 1`时,发到MQ的重试队列。`maxTimesPerEvent
≥ 1`时,直接处理为消费成功。该变量是记录什么的?
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ sendMessageBack(event);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ getHandleMessageContext().finish();
Review Comment:
I didn't understand this `maxTimesPerEvent`. When `maxTimesPerEvent<1`, send
to the retry queue of MQ. When `maxTimesPerEvent ≥ 1`, it is directly processed
as successful consumption. What does this variable record?
---
这个`maxTimesPerEvent`没看明白。`maxTimesPerEvent < 1`时,发到MQ的重试队列。`maxTimesPerEvent
≥ 1`时,直接处理为消费成功。该变量是记录什么的?
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ sendMessageBack(event);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ getHandleMessageContext().finish();
+ }
+ } else {
+ doRun(timeout);
+ }
+ }
+
+ protected HandleMessageContext getHandleMessageContext() throws Exception {
+ throw new IllegalAccessException("method not supported.");
+ }
+
+ public abstract void doRun(Timeout timeout) throws Exception;
+
+ @SneakyThrows
+ protected ProducerManager getProducerManager() {
+ throw new IllegalAccessException("method not supported.");
+ }
+
+ @SneakyThrows
+ protected ConsumerGroupConf getConsumerGroupConf() {
+ throw new IllegalAccessException("method not supported.");
+ }
+
+ private void sendMessageBack(final CloudEvent event) throws Exception {
+ String topic = event.getSubject();
+ String bizSeqNo =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
+ String uniqueId =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
+
+ ConsumerGroupConf consumerGroupConf = getConsumerGroupConf();
+ String consumerGroupName = consumerGroupConf.getConsumerGroup();
+ EventMeshProducer sendMessageBack =
getProducerManager().getEventMeshProducer(consumerGroupName);
+
+ if (sendMessageBack == null) {
+ log.warn("consumer:{} consume fail, sendMessageBack, topic:{},
bizSeqNo:{}, uniqueId:{}",
+ consumerGroupName, topic, bizSeqNo, uniqueId);
+ return;
+ }
+ TopicNameHelper topicNameGenerator =
EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
+ commonConfiguration.getEventMeshStoragePluginType());
+ String retryTopicName =
topicNameGenerator.generateRetryTopicName(consumerGroupName);
Review Comment:
Send the message to the MQ retry queue for retry, more accurately RocketMQ.
Because other mainstream MQ does not have a retry queue like RocketMQ, for
example, RabbitMQ's retry queue is a regular queue that is only controlled
through the `x-max-retries` attribute in the message header, and Kafka does not
have a retry queue.
This also vaguely reminds me of the reason why PR #4179 was not merged, as
if considering that this method can only be implemented for RocketMQ and is
difficult to extend to other MQ. Is that the reason? @xwm1992
---
将消息发给MQ的重试队列进行重试,更准确的说是RocketMQ。因为其他主流的MQ并没有像RocketMQ一样的重试队列,比如RabbitMQ的重试队列是普通队列,只是通过消息头的`x-max-retries`属性来控制、Kafka没有重试队列。
这也让我模糊地想起了没有合并PR #4179的原因,好像是考虑这种方式只能针对RocketMQ来实现,难以拓展到其他MQ。是这个原因吗?@xwm1992
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]