This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new b8516c0 feat:targets own isolate consumerGroup
b8516c0 is described below
commit b8516c02b6f49af5da3c7857ed7522ae3a5f034d
Author: changfeng <[email protected]>
AuthorDate: Mon Apr 24 16:22:59 2023 +0800
feat:targets own isolate consumerGroup
---
.../boot/listener/RocketMQEventSubscriber.java | 157 ++++++++++++---------
.../{consumer => rocketmq}/ClientConfig.java | 2 +-
.../{consumer => rocketmq}/ConsumeRequest.java | 2 +-
.../ExponentialRetryPolicy.java | 2 +-
.../{consumer => rocketmq}/LitePullConsumer.java | 2 +-
.../LitePullConsumerImpl.java | 13 +-
.../{consumer => rocketmq}/LocalMessageCache.java | 2 +-
.../{consumer => rocketmq}/RetryPolicy.java | 2 +-
8 files changed, 104 insertions(+), 78 deletions(-)
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index b4c7527..338e3ff 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -37,9 +37,10 @@ import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.utils.NetworkUtil;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.ClientConfig;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumer;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumerImpl;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq.ClientConfig;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq.LitePullConsumer;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq.LitePullConsumerImpl;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
@@ -54,14 +55,18 @@ import
org.springframework.core.io.support.PropertiesLoaderUtils;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.springframework.stereotype.Component;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* RocketMQ implement event subscriber
@@ -71,7 +76,7 @@ public class RocketMQEventSubscriber extends EventSubscriber {
private static final Logger logger =
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
- private LitePullConsumer pullConsumer;
+ private final BlockingQueue<MessageExt> messageBuffer = new
LinkedBlockingQueue<>(50000);
@Autowired
private final TargetRunnerConfigObserver runnerConfigObserver;
@@ -82,46 +87,42 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
private ClientConfig clientConfig;
private SessionCredentials sessionCredentials;
private String socksProxy;
+ private Map<String, ConsumeWorker> consumeWorkerMap = new
ConcurrentHashMap<>();
private static final String SEMICOLON = ";";
private static final String SYS_DEFAULT_GROUP =
"event-bridge-default-group";
public static final String QUEUE_OFFSET = "queueOffset";
+ private static final String RUNNER_NAME = "runnerName";
public RocketMQEventSubscriber(TargetRunnerConfigObserver
runnerConfigObserver) {
this.runnerConfigObserver = runnerConfigObserver;
this.initMqProperties();
- this.initPullConsumer();
+ this.initConsumeWorkers(runnerConfigObserver);
}
@Override
public void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum
refreshTypeEnum) {
- if(Objects.isNull(pullConsumer)){
- pullConsumer = initLitePullConsumer();
- return;
- }
- Set<String> currentTopics =
parseTopicsByRunnerConfigs(Sets.newHashSet(targetRunnerConfig));
- for (String topic : currentTopics){
- switch (refreshTypeEnum){
- case ADD:
- case UPDATE:
- subscribe(topic);
- break;
- case DELETE:
- unSubscribe(topic);
- break;
- default:
- break;
- }
+ switch (refreshTypeEnum) {
+ case ADD:
+ case UPDATE:
+ putConsumeWorker(targetRunnerConfig);
+ break;
+ case DELETE:
+ removeConsumeWorker(targetRunnerConfig);
+ break;
+ default:
+ break;
}
}
@Override
public List<ConnectRecord> pull() {
- List<MessageExt> messages = pullConsumer.poll(pullBatchSize,
Duration.ofSeconds(pullTimeOut));
+ ArrayList<MessageExt> messages = new ArrayList<>();
+ messageBuffer.drainTo(messages, pullBatchSize);
if (CollectionUtils.isEmpty(messages)) {
- logger.info("consumer poll message empty , consumer - {}",
JSON.toJSONString(pullConsumer));
+ logger.info("consumer poll message empty.");
return null;
}
List<ConnectRecord> connectRecords = Lists.newArrayList();
@@ -148,23 +149,17 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
/**
* parse topics by specific target runner configs
- * @param targetRunnerConfigs
+ * @param targetRunnerConfig
* @return
*/
- public Set<String> parseTopicsByRunnerConfigs(Set<TargetRunnerConfig>
targetRunnerConfigs){
- if(CollectionUtils.isEmpty(targetRunnerConfigs)){
- logger.warn("target runner config is empty, parse to topic
failed!");
- return null;
- }
+ public Set<String> parseTopicsByRunnerConfig(TargetRunnerConfig
targetRunnerConfig){
Set<String> listenTopics = Sets.newHashSet();
- for(TargetRunnerConfig runnerConfig : targetRunnerConfigs){
- List<Map<String,String>> runnerConfigMap =
runnerConfig.getComponents();
- if(CollectionUtils.isEmpty(runnerConfigMap)){
- logger.warn("target runner config components is empty, config
info - {}", runnerConfig);
- continue;
- }
-
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CHANNEL_NAME));
+ List<Map<String,String>> runnerConfigMap =
targetRunnerConfig.getComponents();
+ if (CollectionUtils.isEmpty(runnerConfigMap)){
+ logger.warn("target runner config components is empty, config info
- {}", targetRunnerConfig);
+ return listenTopics;
}
+
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CHANNEL_NAME));
return listenTopics;
}
@@ -218,28 +213,25 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
/**
* init rocket mq pull consumer
*/
- private void initPullConsumer() {
- pullConsumer = initLitePullConsumer();
+ private void initConsumeWorkers(TargetRunnerConfigObserver
runnerConfigObserver) {
+ for (TargetRunnerConfig targetRunnerConfig :
runnerConfigObserver.getTargetRunnerConfig()) {
+ LitePullConsumer litePullConsumer =
initLitePullConsumer(targetRunnerConfig);
+ ConsumeWorker consumeWorker = new ConsumeWorker(litePullConsumer,
targetRunnerConfig.getName());
+ consumeWorkerMap.put(targetRunnerConfig.getName(), consumeWorker);
+ consumeWorker.start();
+ }
+
}
/**
* first init default rocketmq pull consumer
* @return
*/
- public LitePullConsumer initLitePullConsumer() {
- if (pullConsumer != null) {
- try {
- pullConsumer.shutdown();
- } catch (Exception e) {
- logger.error("Consumer shutdown failed.", e);
- }
- }
-
- Set<TargetRunnerConfig> targetRunnerConfigs =
runnerConfigObserver.getTargetRunnerConfig();
- Set<String> topics = parseTopicsByRunnerConfigs(targetRunnerConfigs);
+ public LitePullConsumer initLitePullConsumer(TargetRunnerConfig
targetRunnerConfig) {
+ Set<String> topics = parseTopicsByRunnerConfig(targetRunnerConfig);
RPCHook rpcHook = this.sessionCredentials != null ? new
AclClientRPCHook(this.sessionCredentials) : null;
- this.pullConsumer = new LitePullConsumerImpl(this.clientConfig,
rpcHook);
+ LitePullConsumerImpl pullConsumer = new
LitePullConsumerImpl(this.clientConfig, rpcHook);
if (StringUtils.isNotBlank(this.socksProxy)) {
pullConsumer.setSockProxyJson(this.socksProxy);
}
@@ -310,20 +302,59 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
return recordOffset;
}
- /**
- * new topic for subscribe
- * @param topic
- */
- private void subscribe(String topic) {
- pullConsumer.subscribe(topic);
+ private void putConsumeWorker(TargetRunnerConfig targetRunnerConfig) {
+ ConsumeWorker consumeWorker =
consumeWorkerMap.get(targetRunnerConfig.getName());
+ if (!Objects.isNull(consumeWorker)){
+ consumeWorker.shutdown();
+ }
+ LitePullConsumer litePullConsumer =
initLitePullConsumer(targetRunnerConfig);
+ ConsumeWorker newWorker = new ConsumeWorker(litePullConsumer,
targetRunnerConfig.getName());
+ consumeWorkerMap.put(targetRunnerConfig.getName(), newWorker);
+ newWorker.start();
}
- /**
- * unsubscribe old topic
- * @param topic
- */
- private void unSubscribe(String topic) {
- pullConsumer.unsubscribe(topic);
+ private void removeConsumeWorker(TargetRunnerConfig targetRunnerConfig) {
+ ConsumeWorker consumeWorker =
consumeWorkerMap.remove(targetRunnerConfig.getName());
+ if (!Objects.isNull(consumeWorker)){
+ consumeWorker.shutdown();
+ }
+ }
+
+ class ConsumeWorker extends ServiceThread {
+
+ private final LitePullConsumer pullConsumer;
+ private final String runnerName;
+
+ public ConsumeWorker(LitePullConsumer pullConsumer, String runnerName)
{
+ this.pullConsumer = pullConsumer;
+ this.runnerName = runnerName;
+ }
+
+ @Override
+ public String getServiceName() {
+ return ConsumeWorker.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ List<MessageExt> messages =
pullConsumer.poll(pullBatchSize, Duration.ofSeconds(pullTimeOut));
+ for (MessageExt message : messages) {
+ message.putUserProperty(RUNNER_NAME, runnerName);
+ messageBuffer.put(message);
+ }
+ } catch (Exception exception) {
+ logger.error(getServiceName() + " - event bus pull record
exception, stackTrace - ", exception);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ pullConsumer.shutdown();
+ super.shutdown();
+ }
}
}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ClientConfig.java
similarity index 99%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ClientConfig.java
index 2df7b4e..0be8b2c 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ClientConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ConsumeRequest.java
similarity index 99%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ConsumeRequest.java
index c59e2e1..404769a 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ConsumeRequest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.message.MessageExt;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ExponentialRetryPolicy.java
similarity index 99%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ExponentialRetryPolicy.java
index c217784..06c0b5b 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ExponentialRetryPolicy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
import com.google.common.base.MoreObjects;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumer.java
similarity index 98%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumer.java
index 46f03a3..e9ed516 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumerImpl.java
similarity index 97%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumerImpl.java
index 3d6f74a..21e94c2 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumerImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -64,12 +64,14 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
private final ExecutorService executorService =
Executors.newSingleThreadExecutor(
ThreadUtils.newThreadFactory("PullConsumerExecutorService",
false));
+ private static final String DEFAULT_INSTANCE_NAME =
"EventBridge_Consumer_INSTANCE";
+
public LitePullConsumerImpl(final ClientConfig clientConfig, final RPCHook
rpcHook) {
this.clientConfig = clientConfig;
rocketmqPullConsumer = new
DefaultMQPullConsumer(clientConfig.getConsumerGroup(), rpcHook);
rocketmqPullConsumer.setNamesrvAddr(clientConfig.getNameSrvAddr());
rocketmqPullConsumer.setAllocateMessageQueueStrategy(new
AllocateMessageQueueAveragelyByCircle());
-
rocketmqPullConsumer.setInstanceName(buildInstanceName(clientConfig.getNameSrvAddr(),
clientConfig.getConsumerGroup()));
+ rocketmqPullConsumer.setInstanceName(DEFAULT_INSTANCE_NAME);
if (clientConfig.getAccessChannel() != null) {
rocketmqPullConsumer.setAccessChannel(clientConfig.getAccessChannel());
}
@@ -147,13 +149,6 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
rocketmqPullConsumer.setSocksProxyConfig(proxyJson);
}
- private String buildInstanceName(String nameSrvAddr, String consumerGroup)
{
- return UtilAll.getPid()
- + "#" + nameSrvAddr.hashCode()
- + "#" + consumerGroup
- + "#" + System.nanoTime();
- }
-
private RetryPolicy getRetryPolicy(MessageQueue messageQueue) {
return retryPolicies.computeIfAbsent(messageQueue, queue ->
new ExponentialRetryPolicy(Duration.ofMillis(100).toMillis(),
Duration.ofMinutes(10).toMillis(), 2));
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LocalMessageCache.java
similarity index 99%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LocalMessageCache.java
index 73d0e87..eaa3538 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LocalMessageCache.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/RetryPolicy.java
similarity index 98%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/RetryPolicy.java
index 0b4637f..5753119 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/RetryPolicy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
/**
* @Author changfeng