Repository: incubator-rocketmq
Updated Branches:
  refs/heads/openmessaging-impl 85e85123d -> 28a6deacd (forced update)


Clean expired message in oms pull consumer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/28a6deac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/28a6deac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/28a6deac

Branch: refs/heads/openmessaging-impl
Commit: 28a6deacdc9f03a0b824cf0d87522b8fc18ccbd8
Parents: 2e3c1b0
Author: yukon <[email protected]>
Authored: Wed Apr 19 17:12:29 2017 +0800
Committer: yukon <[email protected]>
Committed: Wed Apr 19 17:55:49 2017 +0800

----------------------------------------------------------------------
 .../example/openmessaging/SimpleProducer.java   |   2 +-
 .../openmessaging/SimplePullConsumer.java       |   2 +-
 .../openmessaging/SimplePushConsumer.java       |   2 +-
 .../rocketmq/consumer/LocalMessageCache.java    | 105 ++++++++++++++++++-
 .../rocketmq/consumer/PullConsumerImpl.java     |   3 +
 5 files changed, 108 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/28a6deac/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
----------------------------------------------------------------------
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index f89ae4c..ac7f7c4 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -28,7 +28,7 @@ import java.nio.charset.Charset;
 public class SimpleProducer {
     public static void main(String[] args) {
         final MessagingAccessPoint messagingAccessPoint = 
MessagingAccessPointFactory
-            
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+            
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
 
         final Producer producer = messagingAccessPoint.createProducer();
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/28a6deac/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
----------------------------------------------------------------------
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 86cb696..36b6b1d 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -27,7 +27,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys;
 public class SimplePullConsumer {
     public static void main(String[] args) {
         final MessagingAccessPoint messagingAccessPoint = 
MessagingAccessPointFactory
-            
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+            
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
 
         final PullConsumer consumer = 
messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
             OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, 
"OMS_CONSUMER"));

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/28a6deac/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
----------------------------------------------------------------------
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index 813e301..84c1b15 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -29,7 +29,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys;
 public class SimplePushConsumer {
     public static void main(String[] args) {
         final MessagingAccessPoint messagingAccessPoint = 
MessagingAccessPointFactory
-            
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+            
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
 
         final PushConsumer consumer = messagingAccessPoint.
             
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, 
"OMS_CONSUMER"));

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/28a6deac/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
----------------------------------------------------------------------
diff --git 
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
 
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
index 0ffd36c..9afc4c9 100644
--- 
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
+++ 
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -18,27 +18,39 @@ package io.openmessaging.rocketmq.consumer;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.PropertyKeys;
+import io.openmessaging.ServiceLifecycle;
 import io.openmessaging.rocketmq.ClientConfig;
 import io.openmessaging.rocketmq.domain.ConsumeRequest;
 import java.util.Collections;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.slf4j.Logger;
 
-class LocalMessageCache {
+class LocalMessageCache implements ServiceLifecycle {
     private final BlockingQueue<ConsumeRequest> consumeRequestCache;
     private final Map<String, ConsumeRequest> consumedRequest;
     private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
     private final DefaultMQPullConsumer rocketmqPullConsumer;
     private final ClientConfig clientConfig;
+    private final ScheduledExecutorService cleanExpireMsgExecutors;
+
     private final static Logger log = ClientLogger.getLog();
 
     LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final 
ClientConfig clientConfig) {
@@ -47,6 +59,8 @@ class LocalMessageCache {
         this.pullOffsetTable = new ConcurrentHashMap<>();
         this.rocketmqPullConsumer = rocketmqPullConsumer;
         this.clientConfig = clientConfig;
+        this.cleanExpireMsgExecutors = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "OMS_CleanExpireMsgScheduledThread_"));
     }
 
     int nextPullBatchNums() {
@@ -92,9 +106,11 @@ class LocalMessageCache {
         try {
             ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, 
TimeUnit.MILLISECONDS);
             if (consumeRequest != null) {
+                MessageExt messageExt = consumeRequest.getMessageExt();
                 
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
-                consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), 
consumeRequest);
-                return consumeRequest.getMessageExt();
+                MessageAccessor.setConsumeStartTimeStamp(messageExt, 
String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+                consumedRequest.put(messageExt.getMsgId(), consumeRequest);
+                return messageExt;
             }
         } catch (InterruptedException ignore) {
         }
@@ -112,4 +128,87 @@ class LocalMessageCache {
             }
         }
     }
+
+    void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, 
final MessageExt messageExt) {
+        consumedRequest.remove(messageExt.getMsgId());
+        long offset = 
processQueue.removeMessage(Collections.singletonList(messageExt));
+        try {
+            rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset);
+        } catch (MQClientException e) {
+            log.error("A error occurred in update consume offset process.", e);
+        }
+    }
+
+    @Override
+    public void startup() {
+        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                cleanExpireMsg();
+            }
+        }, clientConfig.getRmqMessageConsumeTimeout(), 
clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
+    }
+
+    @Override
+    public void shutdown() {
+        ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, 
TimeUnit.MILLISECONDS);
+    }
+
+    private void cleanExpireMsg() {
+        for (final Map.Entry<MessageQueue, ProcessQueue> next : 
rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
+            .getRebalanceImpl().getProcessQueueTable().entrySet()) {
+            ProcessQueue pq = next.getValue();
+            MessageQueue mq = next.getKey();
+            ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
+            if (lockTreeMap == null) {
+                log.error("Gets tree map lock in process queue error, may be 
has compatibility issue");
+                return;
+            }
+
+            TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
+
+            int loop = msgTreeMap.size();
+            for (int i = 0; i < loop; i++) {
+                MessageExt msg = null;
+                try {
+                    lockTreeMap.readLock().lockInterruptibly();
+                    try {
+                        if (!msgTreeMap.isEmpty()) {
+                            msg = msgTreeMap.firstEntry().getValue();
+                            System.out.println(msg);
+                            if (System.currentTimeMillis() - 
Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
+                                > clientConfig.getRmqMessageConsumeTimeout() * 
60 * 1000) {
+                                //Expired, ack and remove it.
+                            } else {
+                                break;
+                            }
+                        } else {
+                            break;
+                        }
+                    } finally {
+                        lockTreeMap.readLock().unlock();
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Gets expired message exception", e);
+                }
+
+                try {
+                    rocketmqPullConsumer.sendMessageBack(msg, 3);
+                    log.info("Send expired msg back. topic={}, msgId={}, 
storeHost={}, queueId={}, queueOffset={}",
+                        msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), 
msg.getQueueId(), msg.getQueueOffset());
+                    ack(mq, pq, msg);
+                } catch (Exception e) {
+                    log.error("Send back expired msg exception", e);
+                }
+            }
+        }
+    }
+
+    private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
+        try {
+            return (ReadWriteLock) FieldUtils.readDeclaredField(pq, 
"lockTreeMap", true);
+        } catch (IllegalAccessException e) {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/28a6deac/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
 
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index 56a49a4..5d4e7d9 100644
--- 
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ 
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -114,6 +114,7 @@ public class PullConsumerImpl implements PullConsumer {
             try {
                 registerPullTaskCallback();
                 this.pullConsumerScheduleService.start();
+                this.localMessageCache.startup();
             } catch (MQClientException e) {
                 throw new OMSRuntimeException("-1", e);
             }
@@ -136,6 +137,7 @@ public class PullConsumerImpl implements PullConsumer {
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
                             if (pq != null) {
+                                pq.putMessage(pullResult.getMsgFoundList());
                                 for (final MessageExt messageExt : 
pullResult.getMsgFoundList()) {
                                     localMessageCache.submitConsumeRequest(new 
ConsumeRequest(messageExt, mq, pq));
                                 }
@@ -155,6 +157,7 @@ public class PullConsumerImpl implements PullConsumer {
     @Override
     public synchronized void shutdown() {
         if (this.started) {
+            this.localMessageCache.shutdown();
             this.pullConsumerScheduleService.shutdown();
             this.rocketmqPullConsumer.shutdown();
         }

Reply via email to