vongosling closed pull request #190: [ROCKETMQ-309] Add new consumer implement
URL: https://github.com/apache/rocketmq/pull/190
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQBroadcastFromLastConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQBroadcastFromLastConsumer.java
new file mode 100644
index 000000000..d25f1681c
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQBroadcastFromLastConsumer.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.*;
+
+/**
+ * Schedule service for pull consumer,which use broadcast model  and consume 
message from last offset
+ */
+public class MQBroadcastFromLastConsumer {
+    private final Logger log = ClientLogger.getLog();
+
+    private final MessageQueueListener messageQueueListener = new 
MessageQueueListenerImpl();
+    private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
+            new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+    private DefaultMQPullConsumer defaultMQPullConsumer;
+    private int pullThreadNums;
+    private ConcurrentHashMap<String /* topic */, PullTaskCallback> 
callbackTable =
+            new ConcurrentHashMap<String, PullTaskCallback>();
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+    private Executor rebalanceExecutor = Executors.newSingleThreadExecutor();
+
+    public MQBroadcastFromLastConsumer(final String consumerGroup, int 
pullThreadNums) {
+        this.pullThreadNums = pullThreadNums;
+        this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
+        this.defaultMQPullConsumer.setMessageModel(MessageModel.BROADCASTING);
+    }
+
+    public void putTask(final String topic, final Set<MessageQueue> mqNewSet) {
+        rebalanceExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                Iterator<Entry<MessageQueue, PullTaskImpl>> it = 
MQBroadcastFromLastConsumer.this.taskTable.entrySet().iterator();
+                while (it.hasNext()) {
+                    Entry<MessageQueue, PullTaskImpl> next = it.next();
+                    if (next.getKey().getTopic().equals(topic)) {
+                        if (!mqNewSet.contains(next.getKey())) {
+                            next.getValue().setCancelled(true);
+                            it.remove();
+                        }
+                    }
+                }
+
+                for (MessageQueue mq : mqNewSet) {
+                    if (!taskTable.containsKey(mq)) {
+                        long offset = 0;
+                        try {
+                            offset = defaultMQPullConsumer.maxOffset(mq);
+                        } catch (MQClientException e) {
+                            log.error("get max offset error:{}", 
e.getMessage());
+                        }
+                        if (offset < 0) {
+                            offset = 0;
+                        }
+
+                        
MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getOffsetStore().updateOffset(mq,
 offset, false);
+                        PullTaskImpl command = new PullTaskImpl(mq);
+                        MQBroadcastFromLastConsumer.this.taskTable.put(mq, 
command);
+                        
MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(command, 
0, TimeUnit.MILLISECONDS);
+                    }
+                }
+            }
+        });
+    }
+
+    public void start() throws MQClientException {
+        final String group = this.defaultMQPullConsumer.getConsumerGroup();
+        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+                this.pullThreadNums,
+                new ThreadFactoryImpl("PullMsgThread-" + group)
+        );
+
+        
this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);
+
+        this.defaultMQPullConsumer.start();
+
+        log.info("MQBroadcastFromLastConsumer start OK, {} {}",
+                this.defaultMQPullConsumer.getConsumerGroup(), 
this.callbackTable);
+    }
+
+    public void registerPullTaskCallback(final String topic, final 
PullTaskCallback callback) {
+        this.callbackTable.put(topic, callback);
+        this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
+    }
+
+    public void shutdown() {
+        if (this.scheduledThreadPoolExecutor != null) {
+            this.scheduledThreadPoolExecutor.shutdown();
+        }
+
+        if (this.defaultMQPullConsumer != null) {
+            this.defaultMQPullConsumer.shutdown();
+        }
+    }
+
+    class MessageQueueListenerImpl implements MessageQueueListener {
+        @Override
+        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, 
Set<MessageQueue> mqDivided) {
+            log.info("messageQueueChanged,topic:[{}]", topic);
+            MessageModel messageModel =
+                    
MQBroadcastFromLastConsumer.this.defaultMQPullConsumer.getMessageModel();
+            switch (messageModel) {
+                case BROADCASTING:
+                    MQBroadcastFromLastConsumer.this.putTask(topic, mqAll);
+                    break;
+                case CLUSTERING:
+                    MQBroadcastFromLastConsumer.this.putTask(topic, mqDivided);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
+    }
+
+    public void setInstanceName(String instanceName) {
+        this.defaultMQPullConsumer.setInstanceName(instanceName);
+    }
+
+    class PullTaskImpl implements Runnable {
+        private final MessageQueue messageQueue;
+
+        private volatile boolean cancelled = false;
+
+        public PullTaskImpl(final MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        @Override
+        public void run() {
+            //log.debug("run pull task, if cancel:{}", this.cancelled);
+            String topic = this.messageQueue.getTopic();
+            if (!this.isCancelled()) {
+                PullTaskCallback pullTaskCallback =
+                        
MQBroadcastFromLastConsumer.this.callbackTable.get(topic);
+                if (pullTaskCallback != null) {
+                    final PullTaskContext context = new PullTaskContext();
+                    
context.setPullConsumer(MQBroadcastFromLastConsumer.this.defaultMQPullConsumer);
+                    try {
+                        pullTaskCallback.doPullTask(this.messageQueue, 
context);
+                    } catch (Throwable e) {
+                        context.setPullNextDelayTimeMillis(1000);
+                        log.error("doPullTask Exception", e);
+                    }
+
+                    if (!this.isCancelled()) {
+                        
MQBroadcastFromLastConsumer.this.scheduledThreadPoolExecutor.schedule(this,
+                                context.getPullNextDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+                    } else {
+                        log.warn("The Pull Task is cancelled after doPullTask, 
{}", messageQueue);
+                    }
+                } else {
+                    log.warn("Pull Task Callback not exist , {}", topic);
+                }
+            } else {
+                log.warn("The Pull Task is cancelled, {}", messageQueue);
+            }
+        }
+
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        public void setCancelled(boolean cancelled) {
+            this.cancelled = cancelled;
+        }
+    }
+}
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/MQBroadcastFromLastConsumerTest.java
 
b/example/src/main/java/org/apache/rocketmq/example/simple/MQBroadcastFromLastConsumerTest.java
new file mode 100644
index 000000000..e2316a89c
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/example/simple/MQBroadcastFromLastConsumerTest.java
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.consumer.*;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.util.List;
+
+/**
+ * Created by l_yy on 2017/11/2.
+ */
+public class MQBroadcastFromLastConsumerTest {
+
+
+
+    public static void main(String[] args) throws MQClientException {
+
+
+        final MQBroadcastFromLastConsumer scheduleService = new 
MQBroadcastFromLastConsumer("test_topic_group", 20);
+
+        scheduleService.setNamesrvAddr("127.0.0.1:9876");
+        scheduleService.setInstanceName("RISK_CONTROL_CORE_CONSUMER");
+
+        scheduleService.registerPullTaskCallback("test_topic", new 
PullTaskCallback() {
+            @Override
+            public void doPullTask(MessageQueue mq, PullTaskContext context) {
+                DefaultMQPullConsumer consumer = 
(DefaultMQPullConsumer)context.getPullConsumer();
+                try {
+                    long offset = consumer.fetchConsumeOffset(mq, false);
+                    if (offset < 0)
+                        offset = 0;
+
+                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
+                    if (pullResult == null) {
+                        return;
+                    }
+
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            doFound(pullResult);
+                        case NO_MATCHED_MSG:
+                            context.setPullNextDelayTimeMillis(100);
+                            break;
+                        case NO_NEW_MSG:
+                            context.setPullNextDelayTimeMillis(100);
+                            break;
+                        case OFFSET_ILLEGAL:
+                            context.setPullNextDelayTimeMillis(100);
+                            break;
+                        default:
+                            context.setPullNextDelayTimeMillis(100);
+                            break;
+                    }
+
+                    consumer.updateConsumeOffset(mq, 
pullResult.getNextBeginOffset());
+                } catch (Exception e) {
+
+                }
+            }
+        });
+
+        scheduleService.start();
+
+    }
+
+
+    public static void doFound(PullResult pullResult) {
+        List<MessageExt> msgFoundList= pullResult.getMsgFoundList();
+
+        for (MessageExt messageExt : msgFoundList) {
+            try {
+                String content = new String(messageExt.getBody(), 
RemotingHelper.DEFAULT_CHARSET);
+
+                System.out.println(content);
+            } catch (Exception e) {
+            }
+        }
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to