This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 13f4d98ff [ISSUE#4178] When there are a large number of errors in the
HTTP target, RocketMQ consumption is slow (#4544)
13f4d98ff is described below
commit 13f4d98ff9374c5a036eaf608dd8ef36017aa7d5
Author: yanrongzhen <[email protected]>
AuthorDate: Tue Nov 14 18:03:39 2023 +0800
[ISSUE#4178] When there are a large number of errors in the HTTP target,
RocketMQ consumption is slow (#4544)
* Provide MQ-storage-based retry strategy in http protocol.
* Remove unused method.
* fix
* fix
* Add retry, rocketmq-retry module.
* Do some optimization.
* Use interface instead of HttpHandleMsgContext.
* optimize
* fix
* Remove unused methods.
* fix log
* Remove retry-rocketmq dependency.
* rollback dependency changes.
* Use plugin to load spi.
* Remove unused methods.
* fix: remove storage dependency
* fix: log
* fix: change 'standalone' to 'default'
* fix: codereview
---
.../org/apache/eventmesh/common/Constants.java | 2 +
.../common/config/CommonConfiguration.java | 4 +
.../common/protocol/http/common/ProtocolKey.java | 1 +
.../eventmesh/common/utils/ReflectUtils.java | 3 +
.../eventmesh-retry-api/build.gradle | 29 ++--
.../eventmesh/retry/api}/AbstractRetryer.java | 9 +-
.../org/apache/eventmesh/retry/api}/Retryer.java | 4 +-
.../retry/api/conf/RetryConfiguration.java | 26 ++--
.../retry/api/strategy/RetryStrategy.java | 24 ++--
.../retry/api}/timer/HashedWheelTimer.java | 4 +-
.../apache/eventmesh/retry/api}/timer/Timeout.java | 2 +-
.../apache/eventmesh/retry/api}/timer/Timer.java | 2 +-
.../eventmesh/retry/api}/timer/TimerTask.java | 6 +-
.../eventmesh-retry-rocketmq/build.gradle | 47 +++++++
.../eventmesh-retry-rocketmq/gradle.properties | 19 +++
.../retry/rocketmq/RocketMQRetryStrategyImpl.java | 75 ++++++++++
...ache.eventmesh.retry.api.strategy.RetryStrategy | 16 +++
eventmesh-runtime/build.gradle | 2 +
eventmesh-runtime/conf/eventmesh.properties | 2 +
.../eventmesh/runtime/boot/AbstractHTTPServer.java | 6 +
.../runtime/boot/AbstractRemotingServer.java | 19 ++-
.../eventmesh/runtime/boot/AbstractTCPServer.java | 6 +
.../runtime/boot/EventMeshGrpcServer.java | 19 ++-
.../runtime/boot/EventMeshHTTPServer.java | 2 +-
.../Retryer.java => boot/RemotingServer.java} | 21 +--
.../configuration/EventMeshHTTPConfiguration.java | 1 +
.../runtime/core/protocol/RetryContext.java | 85 +++++++++--
.../consumer/HandleMessageContext.java} | 21 +--
.../protocol/grpc/consumer/EventMeshConsumer.java | 4 +-
.../processor/BatchPublishCloudEventProcessor.java | 6 +-
.../processor/PublishCloudEventsProcessor.java | 6 +-
.../grpc/processor/ReplyMessageProcessor.java | 6 +-
.../grpc/processor/RequestCloudEventProcessor.java | 6 +-
.../protocol/grpc/producer/ProducerManager.java | 90 ------------
.../protocol/grpc/push/AbstractPushRequest.java | 3 +-
.../core/protocol/grpc/retry/GrpcRetryer.java | 2 +-
.../protocol/http/consumer/EventMeshConsumer.java | 13 +-
.../protocol/http/consumer/HandleMsgContext.java | 3 +-
.../http/processor/BatchSendMessageProcessor.java | 5 +-
.../processor/BatchSendMessageV2Processor.java | 4 +-
.../http/processor/ReplyMessageProcessor.java | 4 +-
.../http/processor/SendAsyncEventProcessor.java | 4 +-
.../http/processor/SendAsyncMessageProcessor.java | 4 +-
.../processor/SendAsyncRemoteEventProcessor.java | 4 +-
.../http/processor/SendSyncMessageProcessor.java | 4 +-
.../protocol/http/producer/EventMeshProducer.java | 130 -----------------
.../protocol/http/producer/SendMessageContext.java | 156 ---------------------
.../http/push/AbstractHTTPPushRequest.java | 10 +-
.../protocol/http/push/AsyncHTTPPushRequest.java | 8 +-
.../core/protocol/http/retry/HttpRetryer.java | 2 +-
.../{grpc => }/producer/EventMeshProducer.java | 25 +++-
.../{http => }/producer/ProducerManager.java | 20 +--
.../{grpc => }/producer/SendMessageContext.java | 13 +-
.../client/session/push/DownStreamMsgContext.java | 3 +-
.../tcp/client/session/retry/TcpRetryer.java | 4 +-
.../client/session/send/UpStreamMsgContext.java | 3 +-
.../eventmesh/spi/EventMeshExtensionType.java | 1 +
.../org/apache/eventmesh/api/TopicNameHelper.java | 26 ++--
.../rocketmq/common/TopicNameHelperImpl.java | 21 +--
.../org.apache.eventmesh.api.TopicNameHelper | 16 +++
settings.gradle | 4 +
61 files changed, 473 insertions(+), 594 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index eaf595659..3bc3cbd7e 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -204,4 +204,6 @@ public class Constants {
public static final String OS_WIN_PREFIX = "win";
+ public static final String DEFAULT = "default";
+
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 17f1c5b78..47f52de4c 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.common.config;
import static org.apache.eventmesh.common.Constants.HTTP;
+import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.commons.collections4.CollectionUtils;
@@ -111,6 +112,9 @@ public class CommonConfiguration {
@ConfigFiled(reload = true)
private String meshGroup;
+ @ConfigFiled(field = "server.retry.plugin.type")
+ private String eventMeshRetryPluginType = Constants.DEFAULT;
+
public void reload() {
this.eventMeshWebhookOrigin = "eventmesh." + eventMeshIDC;
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
index 636c5035f..71ad6eef0 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
@@ -32,6 +32,7 @@ public class ProtocolKey {
public static final String PROTOCOL_VERSION = "protocolversion";
public static final String PROTOCOL_DESC = "protocoldesc";
+ public static final String TOPIC = "topic";
public static final String CONTENT_TYPE = "contenttype";
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
index 44fac016a..7dd120568 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
@@ -20,6 +20,9 @@ package org.apache.eventmesh.common.utils;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
public class ReflectUtils {
/**
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
b/eventmesh-retry/eventmesh-retry-api/build.gradle
similarity index 64%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to eventmesh-retry/eventmesh-retry-api/build.gradle
index b004c6aab..228d23f59 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ b/eventmesh-retry/eventmesh-retry-api/build.gradle
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.retry;
+dependencies {
+ implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
+ implementation project(":eventmesh-common")
+ implementation project(":eventmesh-spi")
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+ implementation 'io.cloudevents:cloudevents-core'
+ implementation 'io.cloudevents:cloudevents-json-jackson'
-import java.util.concurrent.TimeUnit;
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
-/**
- * Retryer interface.
- */
-public interface Retryer {
-
- void start();
-
- void shutdown();
-
- long getPendingTimeouts();
-
- void printState();
-
- void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
-
-}
+ testImplementation 'org.junit.jupiter:junit-jupiter'
+}
\ No newline at end of file
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/AbstractRetryer.java
similarity index 90%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
rename to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/AbstractRetryer.java
index 8f1491346..0b6ddf844 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/AbstractRetryer.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.retry.api;
import org.apache.eventmesh.common.EventMeshThreadFactory;
-import org.apache.eventmesh.runtime.core.retry.Retryer;
-import org.apache.eventmesh.runtime.core.timer.HashedWheelTimer;
-import org.apache.eventmesh.runtime.core.timer.Timer;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.retry.api.timer.HashedWheelTimer;
+import org.apache.eventmesh.retry.api.timer.Timer;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
import java.text.SimpleDateFormat;
import java.util.Date;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/Retryer.java
similarity index 90%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/Retryer.java
index b004c6aab..82634e173 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/Retryer.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.retry.api;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
import java.util.concurrent.TimeUnit;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/conf/RetryConfiguration.java
similarity index 64%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/conf/RetryConfiguration.java
index b004c6aab..351a06fd3 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/conf/RetryConfiguration.java
@@ -15,25 +15,27 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.retry.api.conf;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.api.producer.Producer;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
-import java.util.concurrent.TimeUnit;
+import io.cloudevents.CloudEvent;
-/**
- * Retryer interface.
- */
-public interface Retryer {
+import lombok.Builder;
+import lombok.Data;
- void start();
+@Data
+@Builder
+public class RetryConfiguration {
- void shutdown();
+ private CloudEvent event;
- long getPendingTimeouts();
+ private String consumerGroupName;
- void printState();
+ private Producer producer;
- void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+ private String topic;
+ private SubscriptionMode subscriptionMode;
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/strategy/RetryStrategy.java
similarity index 66%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/strategy/RetryStrategy.java
index b004c6aab..739be171b 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/strategy/RetryStrategy.java
@@ -15,25 +15,17 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.retry.api.strategy;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
/**
- * Retryer interface.
+ * Retry strategy.
*/
-public interface Retryer {
-
- void start();
-
- void shutdown();
-
- long getPendingTimeouts();
-
- void printState();
-
- void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType =
EventMeshExtensionType.RETRY)
+public interface RetryStrategy {
+ void retry(RetryConfiguration configuration);
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/HashedWheelTimer.java
similarity index 99%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
rename to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/HashedWheelTimer.java
index 443f0a85f..e71abe61a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/HashedWheelTimer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
import static org.apache.eventmesh.common.Constants.OS_NAME_KEY;
import static org.apache.eventmesh.common.Constants.OS_WIN_PREFIX;
@@ -632,7 +632,7 @@ public class HashedWheelTimer implements Timer {
}
try {
- task.run(this);
+ task.run();
task.setExecuteTimeHook(System.currentTimeMillis());
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timeout.java
similarity index 97%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
rename to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timeout.java
index 12cea1aea..e6485ff51 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timeout.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
/**
* A handle associated with a {@link TimerTask} that is returned by a
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timer.java
similarity index 97%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
rename to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timer.java
index 076f8d04b..50d1ce06a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/TimerTask.java
similarity index 88%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
rename to
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/TimerTask.java
index ae8447f04..879481454 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
+++
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/TimerTask.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
import java.util.concurrent.TimeUnit;
@@ -28,10 +28,8 @@ public interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
- *
- * @param timeout a handle which is associated with this task
*/
- void run(Timeout timeout) throws Exception;
+ void run() throws Exception;
/**
* Hook method to set the execute time.
diff --git a/eventmesh-retry/eventmesh-retry-rocketmq/build.gradle
b/eventmesh-retry/eventmesh-retry-rocketmq/build.gradle
new file mode 100644
index 000000000..3d33929b4
--- /dev/null
+++ b/eventmesh-retry/eventmesh-retry-rocketmq/build.gradle
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+List rocketmq = [
+ "org.apache.rocketmq:rocketmq-client:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-common:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-store:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+]
+
+dependencies {
+ implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
+ implementation
project(":eventmesh-storage-plugin:eventmesh-storage-rocketmq")
+ implementation rocketmq
+ implementation project(":eventmesh-retry:eventmesh-retry-api")
+ implementation project(":eventmesh-common")
+
+ implementation 'io.cloudevents:cloudevents-core'
+ implementation 'io.cloudevents:cloudevents-json-jackson'
+
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+
+ testImplementation 'org.junit.jupiter:junit-jupiter'
+}
\ No newline at end of file
diff --git a/eventmesh-retry/eventmesh-retry-rocketmq/gradle.properties
b/eventmesh-retry/eventmesh-retry-rocketmq/gradle.properties
new file mode 100644
index 000000000..5114e0523
--- /dev/null
+++ b/eventmesh-retry/eventmesh-retry-rocketmq/gradle.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+rocketmq_version=4.9.5
+pluginType=retry
+pluginName=rocketmq
\ No newline at end of file
diff --git
a/eventmesh-retry/eventmesh-retry-rocketmq/src/main/java/org/apache/eventmesh/retry/rocketmq/RocketMQRetryStrategyImpl.java
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/java/org/apache/eventmesh/retry/rocketmq/RocketMQRetryStrategyImpl.java
new file mode 100644
index 000000000..9d817eec5
--- /dev/null
+++
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/java/org/apache/eventmesh/retry/rocketmq/RocketMQRetryStrategyImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.retry.rocketmq;
+
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.SendResult;
+import org.apache.eventmesh.api.exception.OnExceptionContext;
+import org.apache.eventmesh.api.producer.Producer;
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
+import org.apache.eventmesh.retry.api.strategy.RetryStrategy;
+
+import org.apache.rocketmq.common.MixAll;
+
+import java.util.Objects;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RocketMQRetryStrategyImpl implements RetryStrategy {
+
+ @Override
+ public void retry(RetryConfiguration configuration) {
+ sendMessageBack(configuration);
+ }
+
+ @SneakyThrows
+ private void sendMessageBack(final RetryConfiguration configuration) {
+ CloudEvent event = configuration.getEvent();
+ String topic = configuration.getTopic();
+ String consumerGroupName = configuration.getConsumerGroupName();
+ String retryTopicName = MixAll.getRetryTopic(consumerGroupName);
+
+ String bizSeqNo =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
+ String uniqueId =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
+ CloudEvent retryEvent = CloudEventBuilder.from(event)
+ .withExtension(ProtocolKey.TOPIC, topic)
+ .withSubject(retryTopicName)
+ .build();
+ Producer producer = configuration.getProducer();
+ producer.publish(retryEvent, new SendCallback() {
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ log.info("consumer:{} consume success,, bizSeqno:{},
uniqueId:{}",
+ consumerGroupName, bizSeqNo, uniqueId);
+ }
+
+ @Override
+ public void onException(OnExceptionContext context) {
+ log.warn("consumer:{} consume fail, sendMessageBack,
bizSeqno:{}, uniqueId:{}",
+ consumerGroupName, bizSeqNo, uniqueId,
context.getException());
+ }
+ });
+ }
+}
diff --git
a/eventmesh-retry/eventmesh-retry-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy
new file mode 100644
index 000000000..71c2006e9
--- /dev/null
+++
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq=org.apache.eventmesh.retry.rocketmq.RocketMQRetryStrategyImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 54f953369..a7fc97018 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -72,6 +72,8 @@ dependencies {
implementation project(":eventmesh-webhook:eventmesh-webhook-api")
implementation project(":eventmesh-webhook:eventmesh-webhook-receive")
+ implementation project(":eventmesh-retry:eventmesh-retry-api")
+
testImplementation "org.mockito:mockito-inline"
testImplementation "org.mockito:mockito-junit-jupiter"
testImplementation "commons-io:commons-io"
diff --git a/eventmesh-runtime/conf/eventmesh.properties
b/eventmesh-runtime/conf/eventmesh.properties
index de4150fba..cabe3f9bc 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -55,6 +55,8 @@ eventMesh.server.retry.sync.pushRetryTimes=3
eventMesh.server.retry.async.pushRetryDelayInMills=500
eventMesh.server.retry.sync.pushRetryDelayInMills=500
eventMesh.server.retry.pushRetryQueueSize=10000
+eventMesh.server.retry.plugin.type=default
+
# runtime admin
eventMesh.server.admin.http.port=10106
# metaStorage
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index a8d9e41e6..da33f9852 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -18,6 +18,7 @@
package org.apache.eventmesh.runtime.boot;
import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
@@ -149,6 +150,11 @@ public abstract class AbstractHTTPServer extends
AbstractRemotingServer {
httpThreadPoolGroup.initThreadPool();
}
+ @Override
+ public CommonConfiguration getConfiguration() {
+ return eventMeshHttpConfiguration;
+ }
+
@Override
public void start() throws Exception {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index 0a876c413..7f93e1260 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import java.util.concurrent.TimeUnit;
@@ -36,7 +37,7 @@ import lombok.extern.slf4j.Slf4j;
* The most basic server
*/
@Slf4j
-public abstract class AbstractRemotingServer {
+public abstract class AbstractRemotingServer implements RemotingServer {
private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_SLEEP_SECONDS = 30;
@@ -44,6 +45,7 @@ public abstract class AbstractRemotingServer {
private EventLoopGroup bossGroup;
private EventLoopGroup ioGroup;
private EventExecutorGroup workerGroup;
+ protected ProducerManager producerManager;
private int port;
@@ -68,19 +70,32 @@ public abstract class AbstractRemotingServer {
workerGroup = new NioEventLoopGroup(MAX_THREADS, new
EventMeshThreadFactory(threadPrefix + "-worker"));
}
+ protected void initProducerManager() throws Exception {
+ producerManager = new ProducerManager(this);
+ producerManager.init();
+ }
+
+ public ProducerManager getProducerManager() {
+ return producerManager;
+ }
+
public void init(final String threadPrefix) throws Exception {
buildBossGroup(threadPrefix);
buildIOGroup(threadPrefix);
buildWorkerGroup(threadPrefix);
+ initProducerManager();
}
- public abstract void start() throws Exception;
+ public void start() throws Exception {
+ producerManager.start();
+ }
public void shutdown() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
LogUtils.info(log, "shutdown bossGroup");
}
+ producerManager.shutdown();
ThreadUtils.randomPause(TimeUnit.SECONDS.toMillis(DEFAULT_SLEEP_SECONDS));
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index f6ae561c7..708f0000e 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.boot;
+import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
@@ -111,6 +112,11 @@ public class AbstractTCPServer extends
AbstractRemotingServer {
tcpThreadPoolGroup.initThreadPool();
}
+ @Override
+ public CommonConfiguration getConfiguration() {
+ return eventMeshTCPConfiguration;
+ }
+
@Override
public void start() throws Exception {
initSharableHandlers();
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
index 1ee3d69da..58832d1d3 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
@@ -22,6 +22,7 @@ import static org.apache.eventmesh.common.Constants.GRPC;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.metrics.api.MetricsPluginFactory;
@@ -30,7 +31,6 @@ import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ConsumerService;
import
org.apache.eventmesh.runtime.core.protocol.grpc.service.HeartbeatService;
@@ -60,7 +60,7 @@ import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class EventMeshGrpcServer {
+public class EventMeshGrpcServer extends AbstractRemotingServer {
private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration;
@@ -70,8 +70,6 @@ public class EventMeshGrpcServer {
private Server server;
- private ProducerManager producerManager;
-
private ConsumerManager consumerManager;
private GrpcRetryer grpcRetryer;
@@ -112,9 +110,7 @@ public class EventMeshGrpcServer {
msgRateLimiter =
RateLimiter.create(eventMeshGrpcConfiguration.getEventMeshMsgReqNumPerSecond());
- producerManager = new ProducerManager(this);
- producerManager.init();
-
+ initProducerManager();
consumerManager = new ConsumerManager(this);
consumerManager.init();
@@ -134,6 +130,11 @@ public class EventMeshGrpcServer {
log.info("-----------------EventMeshGRPCServer initialized");
}
+ @Override
+ public CommonConfiguration getConfiguration() {
+ return eventMeshGrpcConfiguration;
+ }
+
public void start() throws Exception {
log.info("---------------EventMeshGRPCServer
starting-------------------");
@@ -207,10 +208,6 @@ public class EventMeshGrpcServer {
return this.eventMeshGrpcConfiguration;
}
- public ProducerManager getProducerManager() {
- return producerManager;
- }
-
public ConsumerManager getConsumerManager() {
return consumerManager;
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 2cfe2d28d..cff5a6e69 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -52,9 +52,9 @@ import
org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessage
import
org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor;
import
org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor;
import
org.apache.eventmesh.runtime.core.protocol.http.processor.WebHookProcessor;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPClientPool;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.webhook.receive.WebHookController;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/RemotingServer.java
similarity index 69%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/RemotingServer.java
index b004c6aab..764075cc6 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/RemotingServer.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.runtime.boot;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.eventmesh.common.config.CommonConfiguration;
/**
- * Retryer interface.
+ * Remoting server interface.
*/
-public interface Retryer {
-
- void start();
-
- void shutdown();
-
- long getPendingTimeouts();
-
- void printState();
+public interface RemotingServer {
- void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+ void init() throws Exception;
+ CommonConfiguration getConfiguration();
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
index c266a26bd..a7ed9b26f 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
@@ -131,4 +131,5 @@ public class EventMeshHTTPConfiguration extends
CommonConfiguration {
@ConfigFiled(field = "blacklist.ipv6")
private List<IPAddress> eventMeshIpv6BlackList = Collections.emptyList();
+
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
index 754ac7cfc..ca796dc33 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
@@ -17,43 +17,98 @@
package org.apache.eventmesh.runtime.core.protocol;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
-import org.apache.eventmesh.runtime.core.timer.Timer;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
+import org.apache.eventmesh.retry.api.strategy.RetryStrategy;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
+import
org.apache.eventmesh.runtime.core.protocol.consumer.HandleMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
import io.cloudevents.CloudEvent;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public abstract class RetryContext implements TimerTask {
+ private static final Set<String> RETRY_STRATEGY_PROCESSED_EVENT_LIST =
Collections.synchronizedSet(new HashSet<>());
+
public CloudEvent event;
public String seq;
public int retryTimes;
+ public CommonConfiguration commonConfiguration;
+
public long executeTime = System.currentTimeMillis();
- public long getExecuteTime() {
- return executeTime;
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
+ @Override
+ public void setExecuteTimeHook(long executeTime) {
+ this.executeTime = executeTime;
}
- protected void rePut(Timeout timeout, long tick, TimeUnit timeUnit) {
- if (timeout == null) {
+ @Override
+ public final void run() throws Exception {
+ String eventMeshRetryPluginType =
Optional.ofNullable(commonConfiguration.getEventMeshRetryPluginType())
+ .orElse(Constants.DEFAULT);
+ if (Constants.DEFAULT.equals(eventMeshRetryPluginType)) {
+ log.warn("Because eventmesh retry plugin is default, retry in
memory.");
+ doRun();
return;
}
-
- Timer timer = timeout.timer();
- if (timer.isStop() || timeout.isCancelled()) {
+ if
(!eventMeshRetryPluginType.equals(commonConfiguration.getEventMeshStoragePluginType()))
{
+ log.warn("Because eventmesh retry plugin type mismatched with
storage plugin type, retry in memory.");
+ doRun();
return;
}
+ Optional<RetryStrategy> retryStrategy = Optional.ofNullable(
+ EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+ commonConfiguration.getEventMeshRetryPluginType()));
+ if (!retryStrategy.isPresent()) {
+ log.warn("Storage retry SPI not found, retry in memory.");
+ doRun();
+ return;
+ }
+ if (!RETRY_STRATEGY_PROCESSED_EVENT_LIST.contains(event.getId())) {
+ String consumerGroupName =
getHandleMessageContext().getConsumerGroup();
+ EventMeshProducer producer =
getProducerManager().getEventMeshProducer(consumerGroupName);
+ RetryConfiguration retryConfiguration =
RetryConfiguration.builder()
+ .event(event)
+ .consumerGroupName(consumerGroupName)
+ .producer(producer.getMqProducerWrapper().getMeshMQProducer())
+ .topic(getHandleMessageContext().getTopic())
+ .build();
+ retryStrategy.get().retry(retryConfiguration);
+ RETRY_STRATEGY_PROCESSED_EVENT_LIST.add(event.getId());
+ } else {
+ RETRY_STRATEGY_PROCESSED_EVENT_LIST.remove(event.getId());
+ getHandleMessageContext().finish();
+ }
+ }
- timer.newTimeout(timeout.task(), tick, timeUnit);
+ protected HandleMessageContext getHandleMessageContext() throws Exception {
+ throw new IllegalAccessException("method not supported.");
}
- @Override
- public void setExecuteTimeHook(long executeTime) {
- this.executeTime = executeTime;
+ public abstract void doRun() throws Exception;
+
+ @SneakyThrows
+ protected ProducerManager getProducerManager() {
+ throw new IllegalAccessException("method not supported.");
}
+
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/consumer/HandleMessageContext.java
similarity index 69%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/consumer/HandleMessageContext.java
index b004c6aab..6ba20f376 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/consumer/HandleMessageContext.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.retry;
-
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
-
-import java.util.concurrent.TimeUnit;
+package org.apache.eventmesh.runtime.core.protocol.consumer;
/**
- * Retryer interface.
+ * Handle message context
*/
-public interface Retryer {
-
- void start();
-
- void shutdown();
-
- long getPendingTimeouts();
+public interface HandleMessageContext {
- void printState();
+ void finish();
- void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+ String getTopic();
+ String getConsumerGroup();
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
index c8567a9e6..3a4fde027 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
@@ -48,10 +48,10 @@ import
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.Co
import
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig;
import
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.WebhookTopicConfig;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.commons.collections4.MapUtils;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
index 80a14e435..d912d82ac 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
@@ -30,11 +30,11 @@ import
org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import java.util.List;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
index eb9189bff..996118f29 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
@@ -28,11 +28,11 @@ import
org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import lombok.extern.slf4j.Slf4j;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
index 73c594b54..e7eecbed3 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
@@ -34,11 +34,11 @@ import
org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import java.util.concurrent.TimeUnit;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
index ecb50f499..1e934ff85 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
@@ -26,11 +26,11 @@ import
org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import lombok.extern.slf4j.Slf4j;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/ProducerManager.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/ProducerManager.java
deleted file mode 100644
index 623c50d88..000000000
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/ProducerManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.grpc.producer;
-
-import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import org.apache.eventmesh.runtime.common.ServiceState;
-import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ProducerManager {
-
- private EventMeshGrpcServer eventMeshGrpcServer;
-
- private final ConcurrentHashMap<String, EventMeshProducer> producerTable =
new ConcurrentHashMap<>();
-
- public ProducerManager(EventMeshGrpcServer eventMeshGrpcServer) {
- this.eventMeshGrpcServer = eventMeshGrpcServer;
- }
-
- public void init() throws Exception {
- log.info("Grpc ProducerManager inited......");
- }
-
- public void start() throws Exception {
- log.info("Grpc ProducerManager started......");
- }
-
- public EventMeshProducer getEventMeshProducer(String producerGroup) throws
Exception {
- EventMeshProducer eventMeshProducer = null;
- if (!producerTable.containsKey(producerGroup)) {
- synchronized (producerTable) {
- if (!producerTable.containsKey(producerGroup)) {
- ProducerGroupConf producerGroupConfig = new
ProducerGroupConf(producerGroup);
- eventMeshProducer =
createEventMeshProducer(producerGroupConfig);
- eventMeshProducer.start();
- }
- }
- }
-
- eventMeshProducer = producerTable.get(producerGroup);
-
- if (ServiceState.RUNNING != eventMeshProducer.getStatus()) {
- eventMeshProducer.start();
- }
-
- return eventMeshProducer;
- }
-
- private synchronized EventMeshProducer createEventMeshProducer(
- ProducerGroupConf producerGroupConfig) throws Exception {
- if (producerTable.containsKey(producerGroupConfig.getGroupName())) {
- return producerTable.get(producerGroupConfig.getGroupName());
- }
- EventMeshProducer eventMeshProducer = new EventMeshProducer();
-
eventMeshProducer.init(eventMeshGrpcServer.getEventMeshGrpcConfiguration(),
- producerGroupConfig);
- producerTable.put(producerGroupConfig.getGroupName(),
eventMeshProducer);
- return eventMeshProducer;
- }
-
- public void shutdown() {
- for (EventMeshProducer eventMeshProducer : producerTable.values()) {
- try {
- eventMeshProducer.shutdown();
- } catch (Exception ex) {
- log.error("shutdown eventMeshProducer[{}] err",
eventMeshProducer, ex);
- }
- }
- log.info("producerManager shutdown......");
- }
-}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
index f9f4a23ee..54056c3a1 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
@@ -32,7 +32,6 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
import java.util.Collections;
import java.util.Map;
@@ -158,7 +157,7 @@ public abstract class AbstractPushRequest extends
RetryContext {
}
@Override
- public void run(Timeout timeout) throws Exception {
+ public void doRun() throws Exception {
tryPushRequest();
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
index 6493bfdeb..dccd782c1 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
@@ -17,9 +17,9 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.retry;
+import org.apache.eventmesh.retry.api.AbstractRetryer;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
import lombok.extern.slf4j.Slf4j;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 5177e8c58..bccd6e9f5 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -28,6 +28,7 @@ import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
+import org.apache.eventmesh.api.TopicNameHelper;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
@@ -37,18 +38,20 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler;
import org.apache.eventmesh.runtime.core.protocol.http.push.MessageHandler;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.TraceUtils;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.commons.collections4.MapUtils;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -109,6 +112,9 @@ public class EventMeshConsumer {
EventMeshUtil.getCloudEventExtensionMap(protocolVersion,
event),
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
+ Optional<TopicNameHelper> topicNameHelper =
+
Optional.ofNullable(EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
+
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType()));
String topic = event.getSubject();
String bizSeqNo =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
String uniqueId =
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
@@ -124,6 +130,9 @@ public class EventMeshConsumer {
messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}",
topic, bizSeqNo, uniqueId);
}
+ if (topicNameHelper.isPresent() &&
topicNameHelper.get().isRetryTopic(topic)) {
+ topic =
String.valueOf(event.getExtension(ProtocolKey.TOPIC));
+ }
ConsumerGroupTopicConf currentTopicConfig =
MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
(EventMeshAsyncConsumeContext) context;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
index e6af880a9..c4046827a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
@@ -24,6 +24,7 @@ import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import
org.apache.eventmesh.runtime.core.protocol.consumer.HandleMessageContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
@@ -41,7 +42,7 @@ import io.cloudevents.CloudEvent;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class HandleMsgContext {
+public class HandleMsgContext implements HandleMessageContext {
public static final Logger MESSAGE_LOGGER =
LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 9fc1c0b4a..1606a8c7a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -42,8 +42,8 @@ import
org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.commons.collections4.CollectionUtils;
@@ -246,7 +246,6 @@ public class BatchSendMessageProcessor implements
HttpRequestProcessor {
CloudEvent event = null;
// TODO: Detect the maximum length of messages for different
producers.
final SendMessageContext sendMessageContext = new
SendMessageContext(batchId, event, batchEventMeshProducer, eventMeshHTTPServer);
- sendMessageContext.setEventList(eventlist);
batchEventMeshProducer.send(sendMessageContext, new
SendCallback() {
@Override
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index a390cfccb..41e9ab262 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -41,8 +41,8 @@ import
org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index 1c8ac67b3..510bb2054 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -41,8 +41,8 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index 22e83c1cf..cb96e5433 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -38,8 +38,8 @@ import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index eb327e7c7..153c607fb 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -42,8 +42,8 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.TraceUtils;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 909872038..3f59234bd 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -37,8 +37,8 @@ import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index e8e712aef..44dcdff50 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -41,8 +41,8 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
deleted file mode 100644
index 5e7ab7206..000000000
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.http.producer;
-
-import org.apache.eventmesh.api.RequestReplyCallback;
-import org.apache.eventmesh.api.SendCallback;
-import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
-import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
-import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class EventMeshProducer {
-
- protected AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
-
- protected AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE);
-
- public AtomicBoolean getInited() {
- return inited;
- }
-
- public AtomicBoolean getStarted() {
- return started;
- }
-
- public boolean isStarted() {
- return started.get();
- }
-
- protected ProducerGroupConf producerGroupConfig;
-
- protected EventMeshHTTPConfiguration eventMeshHttpConfiguration;
-
- protected MQProducerWrapper mqProducerWrapper;
-
- public void send(SendMessageContext sendMsgContext, SendCallback
sendCallback) throws Exception {
- mqProducerWrapper.send(sendMsgContext.getEvent(), sendCallback);
- }
-
- public void request(SendMessageContext sendMsgContext,
RequestReplyCallback rrCallback, long timeout)
- throws Exception {
- mqProducerWrapper.request(sendMsgContext.getEvent(), rrCallback,
timeout);
- }
-
- public boolean reply(final SendMessageContext sendMsgContext, final
SendCallback sendCallback) throws Exception {
- mqProducerWrapper.reply(sendMsgContext.getEvent(), sendCallback);
- return true;
- }
-
- public MQProducerWrapper getMqProducerWrapper() {
- return mqProducerWrapper;
- }
-
- public void init(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
- ProducerGroupConf producerGroupConfig) throws Exception {
- if (!inited.compareAndSet(false, true)) {
- return;
- }
- this.producerGroupConfig = producerGroupConfig;
- this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
-
- Properties keyValue = new Properties();
- keyValue.put("producerGroup", producerGroupConfig.getGroupName());
- keyValue.put("instanceName",
EventMeshUtil.buildMeshClientID(producerGroupConfig.getGroupName(),
- eventMeshHttpConfiguration.getEventMeshCluster()));
- if (StringUtils.isNotBlank(producerGroupConfig.getToken())) {
- keyValue.put(Constants.PRODUCER_TOKEN,
producerGroupConfig.getToken());
- }
-
- // TODO for defibus
- keyValue.put("eventMeshIDC",
eventMeshHttpConfiguration.getEventMeshIDC());
- mqProducerWrapper = new
MQProducerWrapper(eventMeshHttpConfiguration.getEventMeshStoragePluginType());
- mqProducerWrapper.init(keyValue);
- log.info("EventMeshProducer [{}] inited.............",
producerGroupConfig.getGroupName());
- }
-
- public void start() throws Exception {
-
- if (!started.compareAndSet(false, true)) {
- return;
- }
- mqProducerWrapper.start();
- log.info("EventMeshProducer [{}] started.............",
producerGroupConfig.getGroupName());
- }
-
- public void shutdown() throws Exception {
- if (!inited.compareAndSet(true, false)) {
- return;
- }
- if (!started.compareAndSet(true, false)) {
- return;
- }
- mqProducerWrapper.shutdown();
- log.info("EventMeshProducer [{}] shutdown.............",
producerGroupConfig.getGroupName());
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("eventMeshProducer={")
- .append("inited=").append(inited.get()).append(",")
- .append("started=").append(started.get()).append(",")
-
.append("producerGroupConfig=").append(producerGroupConfig).append("}");
- return sb.toString();
- }
-}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
deleted file mode 100644
index 0e262aa68..000000000
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.http.producer;
-
-import org.apache.eventmesh.api.SendCallback;
-import org.apache.eventmesh.api.SendResult;
-import org.apache.eventmesh.api.exception.OnExceptionContext;
-import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.protocol.RetryContext;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
-
-import org.apache.commons.lang3.time.DateFormatUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.cloudevents.CloudEvent;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class SendMessageContext extends RetryContext {
-
- private CloudEvent event;
-
- private String bizSeqNo;
-
- private EventMeshProducer eventMeshProducer;
-
- private long createTime = System.currentTimeMillis();
-
- private Map<String, String> props;
-
- public EventMeshHTTPServer eventMeshHTTPServer;
-
- private List<CloudEvent> eventList;
-
- public SendMessageContext(String bizSeqNo, CloudEvent event,
EventMeshProducer eventMeshProducer, EventMeshHTTPServer eventMeshHTTPServer) {
- this.bizSeqNo = bizSeqNo;
- this.event = event;
- this.eventMeshProducer = eventMeshProducer;
- this.eventMeshHTTPServer = eventMeshHTTPServer;
- }
-
- public void addProp(String key, String val) {
- if (props == null) {
- props = new HashMap<>();
- }
- props.put(key, val);
- }
-
- public String getProp(String key) {
- return props.get(key);
- }
-
- public String getBizSeqNo() {
- return bizSeqNo;
- }
-
- public void setBizSeqNo(String bizSeqNo) {
- this.bizSeqNo = bizSeqNo;
- }
-
- public CloudEvent getEvent() {
- return event;
- }
-
- public void setEvent(CloudEvent event) {
- this.event = event;
- }
-
- public EventMeshProducer getEventMeshProducer() {
- return eventMeshProducer;
- }
-
- public void setEventMeshProducer(EventMeshProducer eventMeshProducer) {
- this.eventMeshProducer = eventMeshProducer;
- }
-
- public long getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(long createTime) {
- this.createTime = createTime;
- }
-
- public List<CloudEvent> getEventList() {
- return eventList;
- }
-
- public void setEventList(List<CloudEvent> eventList) {
- this.eventList = eventList;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("sendMessageContext={")
- .append("bizSeqNo=").append(bizSeqNo)
- .append(",retryTimes=").append(retryTimes)
- .append(",producer=").append(eventMeshProducer != null ?
eventMeshProducer.producerGroupConfig.getGroupName() : null)
-
.append(",executeTime=").append(DateFormatUtils.format(executeTime,
Constants.DATE_FORMAT_INCLUDE_MILLISECONDS))
- .append(",createTime=").append(DateFormatUtils.format(createTime,
Constants.DATE_FORMAT_INCLUDE_MILLISECONDS)).append("}");
- return sb.toString();
- }
-
- public void retry() throws Exception {
- if (eventMeshProducer == null) {
- log.error("Exception happends during retry. EventMeshProduceer is
null.");
- return;
- }
-
- if (retryTimes > 0) { // retry once
- log.error("Exception happends during retry. The retryTimes > 0.");
- return;
- }
-
- retryTimes++;
- eventMeshProducer.send(this, new SendCallback() {
-
- @Override
- public void onSuccess(SendResult sendResult) {
- }
-
- @Override
- public void onException(OnExceptionContext context) {
- log.warn("", context.getException());
-
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendBatchMsgFailed(1);
- }
-
- });
- }
-
- @Override
- public void run(Timeout timeout) throws Exception {
- retry();
- }
-}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
index d3cc71a17..562c6ba01 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
@@ -23,7 +23,7 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import
org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -73,6 +73,7 @@ public abstract class AbstractHTTPPushRequest extends
RetryContext {
this.retryer =
handleMsgContext.getEventMeshHTTPServer().getHttpRetryer();
this.ttl = handleMsgContext.getTtl();
this.startIdx = ThreadLocalRandom.current().nextInt(0,
totalUrls.size());
+ super.commonConfiguration = eventMeshHttpConfiguration;
}
public void tryHTTPRequest() {
@@ -129,10 +130,9 @@ public abstract class AbstractHTTPPushRequest extends
RetryContext {
}
}
- protected abstract void doRetry();
-
@Override
- public void run(Timeout timeout) throws Exception {
- doRetry();
+ protected ProducerManager getProducerManager() {
+ return eventMeshHTTPServer.getProducerManager();
}
+
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index c60a01f5c..9cf01cd48 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -127,6 +127,7 @@ public class AsyncHTTPPushRequest extends
AbstractHTTPPushRequest {
.withExtension(EventMeshConstants.RSP_GROUP,
handleMsgContext.getConsumerGroup())
.build();
handleMsgContext.setEvent(event);
+ super.setEvent(event);
String content = "";
try {
@@ -316,6 +317,11 @@ public class AsyncHTTPPushRequest extends
AbstractHTTPPushRequest {
return false;
}
+ @Override
+ protected HandleMsgContext getHandleMessageContext() {
+ return handleMsgContext;
+ }
+
ClientRetCode processResponseContent(String content) {
if (StringUtils.isBlank(content)) {
return ClientRetCode.FAIL;
@@ -359,7 +365,7 @@ public class AsyncHTTPPushRequest extends
AbstractHTTPPushRequest {
}
@Override
- public void doRetry() {
+ public void doRun() {
tryHTTPRequest();
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
index ce1cb6ca5..f081cc159 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
@@ -17,8 +17,8 @@
package org.apache.eventmesh.runtime.core.protocol.http.retry;
+import org.apache.eventmesh.retry.api.AbstractRetryer;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
import lombok.extern.slf4j.Slf4j;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java
similarity index 82%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
rename to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java
index ae77312b8..bd27d6933 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol.grpc.producer;
+package org.apache.eventmesh.runtime.core.protocol.producer;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.runtime.common.ServiceState;
-import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
@@ -53,19 +53,22 @@ public class EventMeshProducer {
mqProducerWrapper.reply(sendMessageContext.getEvent(), sendCallback);
}
- public synchronized void init(EventMeshGrpcConfiguration
eventMeshGrpcConfiguration,
+ public synchronized void init(CommonConfiguration configuration,
ProducerGroupConf producerGroupConfig) throws Exception {
+ if (ServiceState.INITED == serviceState) {
+ return;
+ }
this.producerGroupConfig = producerGroupConfig;
Properties keyValue = new Properties();
keyValue.put(EventMeshConstants.PRODUCER_GROUP,
producerGroupConfig.getGroupName());
keyValue.put(EventMeshConstants.INSTANCE_NAME,
EventMeshUtil.buildMeshClientID(
- producerGroupConfig.getGroupName(),
eventMeshGrpcConfiguration.getEventMeshCluster()));
+ producerGroupConfig.getGroupName(),
configuration.getEventMeshCluster()));
// TODO for defibus
- keyValue.put(EventMeshConstants.EVENT_MESH_IDC,
eventMeshGrpcConfiguration.getEventMeshIDC());
+ keyValue.put(EventMeshConstants.EVENT_MESH_IDC,
configuration.getEventMeshIDC());
mqProducerWrapper = new MQProducerWrapper(
- eventMeshGrpcConfiguration.getEventMeshStoragePluginType());
+ configuration.getEventMeshStoragePluginType());
mqProducerWrapper.init(keyValue);
serviceState = ServiceState.INITED;
log.info("EventMeshProducer [{}] inited...........",
producerGroupConfig.getGroupName());
@@ -82,7 +85,7 @@ public class EventMeshProducer {
}
public synchronized void shutdown() throws Exception {
- if (serviceState == null || ServiceState.INITED == serviceState) {
+ if (serviceState == null || ServiceState.STOPPED == serviceState) {
return;
}
@@ -102,4 +105,12 @@ public class EventMeshProducer {
.append(producerGroupConfig).append("}");
return sb.toString();
}
+
+ public MQProducerWrapper getMqProducerWrapper() {
+ return mqProducerWrapper;
+ }
+
+ public boolean isStarted() {
+ return serviceState == ServiceState.RUNNING;
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/ProducerManager.java
similarity index 85%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
rename to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/ProducerManager.java
index 48b3413f6..251790f89 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/ProducerManager.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol.http.producer;
+package org.apache.eventmesh.runtime.core.protocol.producer;
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
+import org.apache.eventmesh.runtime.boot.AbstractRemotingServer;
import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,15 +27,15 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ProducerManager {
- private final EventMeshHTTPServer eventMeshHTTPServer;
+ private final AbstractRemotingServer eventMeshServer;
/**
* key: group name
*/
- private final ConcurrentHashMap<String, EventMeshProducer> producerTable =
new ConcurrentHashMap<String, EventMeshProducer>();
+ private final ConcurrentHashMap<String, EventMeshProducer> producerTable =
new ConcurrentHashMap<>();
- public ProducerManager(EventMeshHTTPServer eventMeshHTTPServer) {
- this.eventMeshHTTPServer = eventMeshHTTPServer;
+ public ProducerManager(AbstractRemotingServer eventMeshServer) {
+ this.eventMeshServer = eventMeshServer;
}
public void init() throws Exception {
@@ -81,7 +81,7 @@ public class ProducerManager {
eventMeshProducer = producerTable.get(producerGroup);
- if (!eventMeshProducer.getStarted().get()) {
+ if (!eventMeshProducer.isStarted()) {
eventMeshProducer.start();
}
@@ -93,7 +93,7 @@ public class ProducerManager {
return producerTable.get(producerGroupConfig.getGroupName());
}
EventMeshProducer eventMeshProducer = new EventMeshProducer();
-
eventMeshProducer.init(eventMeshHTTPServer.getEventMeshHttpConfiguration(),
producerGroupConfig);
+ eventMeshProducer.init(eventMeshServer.getConfiguration(),
producerGroupConfig);
producerTable.put(producerGroupConfig.getGroupName(),
eventMeshProducer);
return eventMeshProducer;
}
@@ -109,8 +109,8 @@ public class ProducerManager {
log.info("producerManager shutdown......");
}
- public EventMeshHTTPServer getEventMeshHTTPServer() {
- return eventMeshHTTPServer;
+ public AbstractRemotingServer getEventMeshServer() {
+ return eventMeshServer;
}
public ConcurrentHashMap<String, EventMeshProducer> getProducerTable() {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/SendMessageContext.java
similarity index 90%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
rename to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/SendMessageContext.java
index 92bf4a27a..0de970f70 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/SendMessageContext.java
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol.grpc.producer;
+package org.apache.eventmesh.runtime.core.protocol.producer;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
+import org.apache.eventmesh.runtime.boot.AbstractRemotingServer;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.commons.lang3.time.DateFormatUtils;
@@ -44,14 +43,14 @@ public class SendMessageContext extends RetryContext {
private long createTime = System.currentTimeMillis();
- public EventMeshGrpcServer eventMeshGrpcServer;
+ public AbstractRemotingServer eventMeshServer;
public SendMessageContext(String bizSeqNo, CloudEvent event,
EventMeshProducer eventMeshProducer,
- EventMeshGrpcServer eventMeshGrpcServer) {
+ AbstractRemotingServer eventMeshServer) {
this.bizSeqNo = bizSeqNo;
this.event = event;
this.eventMeshProducer = eventMeshProducer;
- this.eventMeshGrpcServer = eventMeshGrpcServer;
+ this.eventMeshServer = eventMeshServer;
}
public String getBizSeqNo() {
@@ -127,7 +126,7 @@ public class SendMessageContext extends RetryContext {
}
@Override
- public void run(Timeout timeout) throws Exception {
+ public void doRun() throws Exception {
retry();
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 8b00bf579..1a4d53f3d 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -24,7 +24,6 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.ServerGlobal;
@@ -195,7 +194,7 @@ public class DownStreamMsgContext extends RetryContext {
}
@Override
- public void run(Timeout timeout) throws Exception {
+ public void doRun() {
retry();
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
index 2150eb562..411299440 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
@@ -18,11 +18,11 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.retry.api.AbstractRetryer;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import java.util.concurrent.TimeUnit;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
index ef716801f..d6730a984 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
@@ -27,7 +27,6 @@ import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.Utils;
@@ -163,7 +162,7 @@ public class UpStreamMsgContext extends RetryContext {
}
@Override
- public void run(Timeout timeout) throws Exception {
+ public void doRun() throws Exception {
retry();
}
}
diff --git
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
index c540358e8..f76379f9e 100644
---
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
+++
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
@@ -34,6 +34,7 @@ public enum EventMeshExtensionType {
JDBC_SNAPSHOT_ENGINE("jdbc_snapshot_engine"),
JDBC_DATABASE_DIALECT("jdbc_database_dialect"),
OFFSETMGMT("offsetMgmt"),
+ RETRY("retry"),
;
private final String extensionTypeName;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
b/eventmesh-storage-plugin/eventmesh-storage-api/src/main/java/org/apache/eventmesh/api/TopicNameHelper.java
similarity index 63%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
rename to
eventmesh-storage-plugin/eventmesh-storage-api/src/main/java/org/apache/eventmesh/api/TopicNameHelper.java
index b004c6aab..8a7d34f32 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-api/src/main/java/org/apache/eventmesh/api/TopicNameHelper.java
@@ -15,25 +15,21 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.api;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
-import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
/**
- * Retryer interface.
+ * Topic name generator.
*/
-public interface Retryer {
-
- void start();
-
- void shutdown();
-
- long getPendingTimeouts();
-
- void printState();
-
- void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType =
EventMeshExtensionType.STORAGE)
+public interface TopicNameHelper {
+ @SneakyThrows
+ default boolean isRetryTopic(String retryTopic) {
+ throw new IllegalAccessException("Method not supported.");
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/common/TopicNameHelperImpl.java
similarity index 63%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
copy to
eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/common/TopicNameHelperImpl.java
index ce1cb6ca5..e0ee8843e 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/common/TopicNameHelperImpl.java
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol.http.retry;
+package org.apache.eventmesh.storage.rocketmq.common;
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
+import org.apache.eventmesh.api.TopicNameHelper;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
-@Slf4j
-public class HttpRetryer extends AbstractRetryer {
+public class TopicNameHelperImpl implements TopicNameHelper {
- private final EventMeshHTTPServer eventMeshHTTPServer;
-
- public HttpRetryer(EventMeshHTTPServer eventMeshHTTPServer) {
- this.eventMeshHTTPServer = eventMeshHTTPServer;
+ @Override
+ public boolean isRetryTopic(String retryTopic) {
+ if (StringUtils.isBlank(retryTopic)) {
+ return false;
+ }
+ return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.TopicNameHelper
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.TopicNameHelper
new file mode 100644
index 000000000..4670fbc91
--- /dev/null
+++
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.TopicNameHelper
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq=org.apache.eventmesh.storage.rocketmq.common.TopicNameHelperImpl
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 676cc6e79..fd91ebd59 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -90,3 +90,7 @@ include 'eventmesh-webhook:eventmesh-webhook-api'
include 'eventmesh-webhook:eventmesh-webhook-admin'
include 'eventmesh-webhook:eventmesh-webhook-receive'
+include 'eventmesh-retry'
+include 'eventmesh-retry:eventmesh-retry-api'
+include 'eventmesh-retry:eventmesh-retry-rocketmq'
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]