This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit 3736c07f3d9042a00d8e9ee779e134cfad8a4c26 Author: duhenglucky <[email protected]> AuthorDate: Mon Nov 25 20:20:31 2019 +0800 feat(PullConsumer) add seek to begin/end support --- .../org/apache/rocketmq/ons/api/Constants.java | 2 ++ .../apache/rocketmq/ons/api/bean/ConsumerBean.java | 33 +++++----------------- .../ons/api/impl/rocketmq/PullConsumerImpl.java | 6 ++-- .../ons/sample/consumer/SimplePullConsumer.java | 7 +++-- .../ons/sample/producer/MQTimerProducer.java | 1 + .../ons/sample/producer/SimpleMQProducer.java | 1 - .../sample/producer/SimpleTransactionProducer.java | 2 +- 7 files changed, 17 insertions(+), 35 deletions(-) diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java index 48d8f67..a82c405 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java @@ -19,4 +19,6 @@ package org.apache.rocketmq.ons.api; public class Constants { public static final String TRANSACTION_ID = "__transactionId__"; + + public static final String TOPIC_PARTITION_SEPARATOR = "#"; } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java index 1555be9..c95cd9f 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java @@ -21,8 +21,6 @@ import io.openmessaging.api.ExpressionType; import io.openmessaging.api.MessageListener; import io.openmessaging.api.MessageSelector; import io.openmessaging.api.bean.Subscription; -import io.openmessaging.api.bean.SubscriptionExt; -import java.lang.reflect.Method; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -69,36 +67,19 @@ public class ConsumerBean implements Consumer { Iterator<Entry<Subscription, MessageListener>> it = this.subscriptionTable.entrySet().iterator(); while (it.hasNext()) { Entry<Subscription, MessageListener> next = it.next(); - if ("com.aliyun.openservices.ons.api.impl.notify.ConsumerImpl".equals(this.consumer.getClass().getCanonicalName()) - && (next.getKey() instanceof SubscriptionExt)) { - SubscriptionExt subscription = (SubscriptionExt) next.getKey(); - for (Method method : this.consumer.getClass().getMethods()) { - if ("subscribeNotify".equals(method.getName())) { - try { - method.invoke(consumer, subscription.getTopic(), subscription.getExpression(), - subscription.isPersistence(), next.getValue()); - } catch (Exception e) { - throw new ONSClientException("subscribeNotify invoke exception", e); - } - break; - } - } - } else { - Subscription subscription = next.getKey(); - if (subscription.getType() == null || ExpressionType.TAG.name().equals(subscription.getType())) { + Subscription subscription = next.getKey(); + if (subscription.getType() == null || ExpressionType.TAG.name().equals(subscription.getType())) { - this.subscribe(subscription.getTopic(), subscription.getExpression(), next.getValue()); + this.subscribe(subscription.getTopic(), subscription.getExpression(), next.getValue()); - } else if (ExpressionType.SQL92.name().equals(subscription.getType())) { + } else if (ExpressionType.SQL92.name().equals(subscription.getType())) { - this.subscribe(subscription.getTopic(), MessageSelector.bySql(subscription.getExpression()), next.getValue()); - } else { + this.subscribe(subscription.getTopic(), MessageSelector.bySql(subscription.getExpression()), next.getValue()); + } else { - throw new ONSClientException(String.format("Expression type %s is unknown!", subscription.getType())); - } + throw new ONSClientException(String.format("Expression type %s is unknown!", subscription.getType())); } - } this.consumer.start(); diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java index 37678bd..d364b88 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java @@ -51,8 +51,6 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer private DefaultLitePullConsumer litePullConsumer; - private final String TOPIC_PARTITION_SPLITER = "#"; - private int maxCachedMessageSizeInMiB = 512; private int maxCachedMessageAmount = 5000; @@ -134,14 +132,14 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer private TopicPartition convertToTopicPartition(MessageQueue messageQueue) { String topic = messageQueue.getTopic(); - String partition = messageQueue.getBrokerName() + TOPIC_PARTITION_SPLITER + messageQueue.getQueueId(); + String partition = messageQueue.getBrokerName() + Constants.TOPIC_PARTITION_SEPARATOR + messageQueue.getQueueId(); TopicPartition topicPartition = new TopicPartition(topic, partition); return topicPartition; } private MessageQueue convertToMessageQueue(TopicPartition topicPartition) { String topic = topicPartition.getTopic(); - String[] tmp = topicPartition.getPartition().split(TOPIC_PARTITION_SPLITER); + String[] tmp = topicPartition.getPartition().split(Constants.TOPIC_PARTITION_SEPARATOR); if (tmp.length != 2) { LOGGER.warn("Failed to get message queue from TopicPartition: {}", topicPartition); throw new ONSClientException("Failed to get message queue"); diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java index 91bf280..5037c24 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java @@ -28,6 +28,8 @@ import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; public class SimplePullConsumer { + public static volatile boolean running = true; + public static void main(String[] args) { MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); @@ -52,12 +54,11 @@ public class SimplePullConsumer { Set<TopicPartition> topicPartitions = consumer.topicPartitions(MQConfig.TOPIC); consumer.assign(topicPartitions); - while (true){ + while (running) { List<Message> messages = consumer.poll(3000); System.out.printf("Received message: %s %n", messages); consumer.commitSync(); } - - + consumer.shutdown(); } } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java index fcf7554..6c81c59 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java @@ -63,5 +63,6 @@ public class MQTimerProducer { e.printStackTrace(); } } + producer.shutdown(); } } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java index 5bb7f9e..c07389c 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java @@ -26,7 +26,6 @@ import java.util.Properties; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; -// io.openmessaging.api.xxx => com.aliyun.openservices.ons.api.xxxx public class SimpleMQProducer { diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java index 2e41e22..c1d890b 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java @@ -70,7 +70,7 @@ public class SimpleTransactionProducer { e.printStackTrace(); } } - + transactionProducer.shutdown(); System.out.printf("Send transaction message success. %n"); } } \ No newline at end of file
