This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new ce92ef7  [ISSUE #48] Support different consumer group by ip (#52)
ce92ef7 is described below

commit ce92ef7207db04cd81d902c0bcf1888d2eaa0104
Author: wangfan <[email protected]>
AuthorDate: Wed Aug 31 10:52:46 2022 +0800

    [ISSUE #48] Support different consumer group by ip (#52)
---
 .../schema/registry/common/utils/CommonUtil.java   | 36 ++++++++++++++++
 .../registry/storage/rocketmq/RocketmqClient.java  | 48 +++++++---------------
 .../rocketmq/configs/RocketmqConfigConstants.java  |  5 ++-
 3 files changed, 53 insertions(+), 36 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
index 4f27ae8..6f21e31 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
@@ -27,6 +27,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -34,6 +37,7 @@ import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -247,4 +251,36 @@ public class CommonUtil {
         return schemaId | schemaVersion;
     }
 
+    public static String getIp() {
+        String ip = "";
+
+        try {
+            Enumeration<NetworkInterface> netInterfaces = 
NetworkInterface.getNetworkInterfaces();
+            InetAddress inetAddress;
+            boolean found = false;
+            while (netInterfaces.hasMoreElements() && !found) {
+                NetworkInterface ni = netInterfaces.nextElement();
+                Enumeration<InetAddress> address = ni.getInetAddresses();
+                while (address.hasMoreElements()) {
+                    inetAddress = address.nextElement();
+                    if (!inetAddress.isSiteLocalAddress()
+                        && !inetAddress.isLoopbackAddress()
+                        && !inetAddress.getHostAddress().contains(":")) {
+                        ip = inetAddress.getHostAddress();
+                        found = true;
+                        break;
+                    } else if (inetAddress.isSiteLocalAddress()
+                        && !inetAddress.isLoopbackAddress()
+                        && !inetAddress.getHostAddress().contains(":")) {
+                        ip = inetAddress.getHostAddress();
+                    }
+                }
+            }
+        } catch (SocketException e) {
+            throw new SchemaException("Get IP failed", e);
+        }
+
+        return ip;
+    }
+
 }
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index a7b36d3..b782b6c 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -19,25 +19,23 @@ package 
org.apache.rocketmq.schema.registry.storage.rocketmq;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -86,7 +84,7 @@ import static 
org.apache.rocketmq.schema.registry.storage.rocketmq.configs.Rocke
 public class RocketmqClient {
 
     private DefaultMQProducer producer;
-    private DefaultLitePullConsumer scheduleConsumer;
+    private DefaultMQPushConsumer scheduleConsumer;
     private DefaultMQAdminExt mqAdminExt;
     private String storageTopic;
     private boolean useCompactTopic;
@@ -96,10 +94,6 @@ public class RocketmqClient {
     private final List<ColumnFamilyDescriptor> cfDescriptors = new 
ArrayList<>();
     private final Map<String, ColumnFamilyHandle> cfHandleMap = new 
HashMap<>();
 
-    private ScheduledExecutorService scheduledExecutorService;
-
-    private static final Integer PULL_TASK_INTERVAL = 5 * 1000;
-
     /**
      * RocksDB for cache
      */
@@ -203,39 +197,28 @@ public class RocketmqClient {
         try {
             producer.start();
 
-            scheduleConsumer.setPullThreadNums(4);
+            scheduleConsumer.subscribe(storageTopic, "*");
+            scheduleConsumer.registerMessageListener(new MessageListener());
             scheduleConsumer.start();
-
-            Collection<MessageQueue> messageQueueList = 
scheduleConsumer.fetchMessageQueues(storageTopic);
-            scheduleConsumer.assign(messageQueueList);
-            messageQueueList.forEach(mq -> {
-                try {
-                    scheduleConsumer.seekToBegin(mq);
-                } catch (MQClientException e) {
-                    e.printStackTrace();
-                }
-            });
-            this.scheduledExecutorService.scheduleAtFixedRate(new 
RocketmqStoragePullTask(),
-                0, PULL_TASK_INTERVAL, TimeUnit.MILLISECONDS);
-
         } catch (MQClientException e) {
             throw new SchemaException("Rocketmq client start failed", e);
         }
     }
 
-    public class RocketmqStoragePullTask implements Runnable {
+    public class MessageListener implements MessageListenerOrderly {
 
         @Override
-        public void run() {
+        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList,
+            ConsumeOrderlyContext context) {
             try {
-                List<MessageExt> msgList = scheduleConsumer.poll(1000);
                 if (CollectionUtils.isNotEmpty(msgList)) {
                     msgList.forEach(this::consumeMessage);
                 }
-                scheduleConsumer.commitSync();
             } catch (Exception e) {
-                log.error("consume message exception, consume offset may not 
commit");
+                log.error("consume message exception, reconsume later");
+                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
             }
+            return ConsumeOrderlyStatus.SUCCESS;
         }
 
         private void consumeMessage(MessageExt msg) {
@@ -463,7 +446,7 @@ public class RocketmqClient {
             props.getProperty(STORAGE_ROCKETMQ_NAMESRV, 
STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
         );
 
-        this.scheduleConsumer = new DefaultLitePullConsumer(
+        this.scheduleConsumer = new DefaultMQPushConsumer(
             props.getProperty(STORAGE_ROCKETMQ_CONSUMER_GROUP, 
STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT)
         );
 
@@ -477,9 +460,6 @@ public class RocketmqClient {
         );
 
         this.converter = new JsonConverterImpl();
-
-        this.scheduledExecutorService =
-            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RocketmqStoragePullTask"));
     }
 
     private ColumnFamilyHandle schemaCfHandle() {
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
index b81f93a..9781e04 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.schema.registry.storage.rocketmq.configs;
 
 import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
 
 public class RocketmqConfigConstants {
 
@@ -28,8 +29,8 @@ public class RocketmqConfigConstants {
     public static final String STORAGE_ROCKETMQ_PRODUCER_GROUP_DEFAULT = 
"default";
 
     public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP = 
"storage.rocketmq.consumer.group";
-    // TODO : ip
-    public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT = 
"default";
+    public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT =
+        CommonUtil.getIp().replace(".", "_");
 
     public static final String STORAGE_ROCKETMQ_NAMESRV = 
"storage.rocketmq.namesrv";
     public static final String STORAGE_ROCKETMQ_NAMESRV_DEFAULT = 
"http://localhost:9876";;

Reply via email to