This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new ce92ef7 [ISSUE #48] Support different consumer group by ip (#52)
ce92ef7 is described below
commit ce92ef7207db04cd81d902c0bcf1888d2eaa0104
Author: wangfan <[email protected]>
AuthorDate: Wed Aug 31 10:52:46 2022 +0800
[ISSUE #48] Support different consumer group by ip (#52)
---
.../schema/registry/common/utils/CommonUtil.java | 36 ++++++++++++++++
.../registry/storage/rocketmq/RocketmqClient.java | 48 +++++++---------------
.../rocketmq/configs/RocketmqConfigConstants.java | 5 ++-
3 files changed, 53 insertions(+), 36 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
index 4f27ae8..6f21e31 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
@@ -27,6 +27,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -34,6 +37,7 @@ import java.nio.file.Paths;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -247,4 +251,36 @@ public class CommonUtil {
return schemaId | schemaVersion;
}
+ public static String getIp() {
+ String ip = "";
+
+ try {
+ Enumeration<NetworkInterface> netInterfaces =
NetworkInterface.getNetworkInterfaces();
+ InetAddress inetAddress;
+ boolean found = false;
+ while (netInterfaces.hasMoreElements() && !found) {
+ NetworkInterface ni = netInterfaces.nextElement();
+ Enumeration<InetAddress> address = ni.getInetAddresses();
+ while (address.hasMoreElements()) {
+ inetAddress = address.nextElement();
+ if (!inetAddress.isSiteLocalAddress()
+ && !inetAddress.isLoopbackAddress()
+ && !inetAddress.getHostAddress().contains(":")) {
+ ip = inetAddress.getHostAddress();
+ found = true;
+ break;
+ } else if (inetAddress.isSiteLocalAddress()
+ && !inetAddress.isLoopbackAddress()
+ && !inetAddress.getHostAddress().contains(":")) {
+ ip = inetAddress.getHostAddress();
+ }
+ }
+ }
+ } catch (SocketException e) {
+ throw new SchemaException("Get IP failed", e);
+ }
+
+ return ip;
+ }
+
}
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index a7b36d3..b782b6c 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -19,25 +19,23 @@ package
org.apache.rocketmq.schema.registry.storage.rocketmq;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
@@ -86,7 +84,7 @@ import static
org.apache.rocketmq.schema.registry.storage.rocketmq.configs.Rocke
public class RocketmqClient {
private DefaultMQProducer producer;
- private DefaultLitePullConsumer scheduleConsumer;
+ private DefaultMQPushConsumer scheduleConsumer;
private DefaultMQAdminExt mqAdminExt;
private String storageTopic;
private boolean useCompactTopic;
@@ -96,10 +94,6 @@ public class RocketmqClient {
private final List<ColumnFamilyDescriptor> cfDescriptors = new
ArrayList<>();
private final Map<String, ColumnFamilyHandle> cfHandleMap = new
HashMap<>();
- private ScheduledExecutorService scheduledExecutorService;
-
- private static final Integer PULL_TASK_INTERVAL = 5 * 1000;
-
/**
* RocksDB for cache
*/
@@ -203,39 +197,28 @@ public class RocketmqClient {
try {
producer.start();
- scheduleConsumer.setPullThreadNums(4);
+ scheduleConsumer.subscribe(storageTopic, "*");
+ scheduleConsumer.registerMessageListener(new MessageListener());
scheduleConsumer.start();
-
- Collection<MessageQueue> messageQueueList =
scheduleConsumer.fetchMessageQueues(storageTopic);
- scheduleConsumer.assign(messageQueueList);
- messageQueueList.forEach(mq -> {
- try {
- scheduleConsumer.seekToBegin(mq);
- } catch (MQClientException e) {
- e.printStackTrace();
- }
- });
- this.scheduledExecutorService.scheduleAtFixedRate(new
RocketmqStoragePullTask(),
- 0, PULL_TASK_INTERVAL, TimeUnit.MILLISECONDS);
-
} catch (MQClientException e) {
throw new SchemaException("Rocketmq client start failed", e);
}
}
- public class RocketmqStoragePullTask implements Runnable {
+ public class MessageListener implements MessageListenerOrderly {
@Override
- public void run() {
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList,
+ ConsumeOrderlyContext context) {
try {
- List<MessageExt> msgList = scheduleConsumer.poll(1000);
if (CollectionUtils.isNotEmpty(msgList)) {
msgList.forEach(this::consumeMessage);
}
- scheduleConsumer.commitSync();
} catch (Exception e) {
- log.error("consume message exception, consume offset may not
commit");
+ log.error("consume message exception, reconsume later");
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
+ return ConsumeOrderlyStatus.SUCCESS;
}
private void consumeMessage(MessageExt msg) {
@@ -463,7 +446,7 @@ public class RocketmqClient {
props.getProperty(STORAGE_ROCKETMQ_NAMESRV,
STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
);
- this.scheduleConsumer = new DefaultLitePullConsumer(
+ this.scheduleConsumer = new DefaultMQPushConsumer(
props.getProperty(STORAGE_ROCKETMQ_CONSUMER_GROUP,
STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT)
);
@@ -477,9 +460,6 @@ public class RocketmqClient {
);
this.converter = new JsonConverterImpl();
-
- this.scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("RocketmqStoragePullTask"));
}
private ColumnFamilyHandle schemaCfHandle() {
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
index b81f93a..9781e04 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.schema.registry.storage.rocketmq.configs;
import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
public class RocketmqConfigConstants {
@@ -28,8 +29,8 @@ public class RocketmqConfigConstants {
public static final String STORAGE_ROCKETMQ_PRODUCER_GROUP_DEFAULT =
"default";
public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP =
"storage.rocketmq.consumer.group";
- // TODO : ip
- public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT =
"default";
+ public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT =
+ CommonUtil.getIp().replace(".", "_");
public static final String STORAGE_ROCKETMQ_NAMESRV =
"storage.rocketmq.namesrv";
public static final String STORAGE_ROCKETMQ_NAMESRV_DEFAULT =
"http://localhost:9876";