This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit efeaebd7050d10800e463d1167a0940078df7f6a Author: 翊名 <[email protected]> AuthorDate: Thu Nov 21 11:47:31 2019 +0800 feat(consumer) add pull consumer support --- .../rocketmq/MessagingAccessPointImpl.java | 5 +- .../org/apache/rocketmq/ons/api/ONSFactory.java | 26 ++ .../org/apache/rocketmq/ons/api/ONSFactoryAPI.java | 26 +- .../rocketmq/ons/api/impl/ONSFactoryImpl.java | 7 + .../ons/api/impl/rocketmq/PullConsumerImpl.java | 292 +++++++++++++++++++++ ons-core/pom.xml | 2 +- .../ons/sample/consumer/SimplePullConsumer.java | 63 +++++ .../producer/LocalTransactionCheckerImpl.java | 2 +- .../sample/producer/SimpleTransactionProducer.java | 4 +- pom.xml | 4 +- 10 files changed, 408 insertions(+), 23 deletions(-) diff --git a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index bb90f64..c1b4a38 100644 --- a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -21,7 +21,6 @@ import io.openmessaging.api.Consumer; import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMSBuiltinKeys; -import io.openmessaging.api.OMSResponseStatus; import io.openmessaging.api.Producer; import io.openmessaging.api.PullConsumer; import io.openmessaging.api.batch.BatchConsumer; @@ -43,6 +42,7 @@ import org.apache.rocketmq.ons.api.impl.rocketmq.ONSUtil; import org.apache.rocketmq.ons.api.impl.rocketmq.OrderConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.OrderProducerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.ProducerImpl; +import org.apache.rocketmq.ons.api.impl.rocketmq.PullConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.TransactionProducerImpl; public class MessagingAccessPointImpl implements MessagingAccessPoint { @@ -71,8 +71,7 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { @Override public PullConsumer createPullConsumer(Properties properties) { injectNameServerAddress(properties); - properties.put(PropertyKeyConst.NAMESRV_ADDR, this.attributes.getProperty(OMSBuiltinKeys.ACCESS_POINTS)); - throw OMSResponseStatus.generateException(OMSResponseStatus.STATUS_1101); + return new PullConsumerImpl(properties); } @Override diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java index 847a6ae..fe2768f 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.ons.api; import io.openmessaging.api.Consumer; import io.openmessaging.api.OMS; import io.openmessaging.api.Producer; +import io.openmessaging.api.PullConsumer; import io.openmessaging.api.batch.BatchConsumer; import io.openmessaging.api.order.OrderConsumer; import io.openmessaging.api.order.OrderProducer; @@ -222,4 +223,29 @@ public class ONSFactory { return onsFactory.createOrderedConsumer(properties); } + /** + * Create Order Consumer + * <p> + * <code>properties</code> + * Requires: + * <ol> + * <li>{@link PropertyKeyConst#GROUP_ID}</li> + * <li>{@link PropertyKeyConst#AccessKey}</li> + * <li>{@link PropertyKeyConst#SecretKey}</li> + * <li>{@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * Optional: + * <ul> + * <li>{@link PropertyKeyConst#ConsumeThreadNums}</li> + * <li>{@link PropertyKeyConst#OnsChannel}</li> + * </ul> + * </p> + * + * @param properties Consumer's configuration + * @return {@code PullConsumer} Thread safe {@link PullConsumer} instance + */ + public static PullConsumer createPullConsumer(final Properties properties) { + return onsFactory.createPullConsumer(properties); + } + } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java index 67186b4..f9213a6 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java @@ -16,15 +16,16 @@ */ package org.apache.rocketmq.ons.api; -import io.openmessaging.api.Consumer; -import io.openmessaging.api.MessagingAccessPoint; -import io.openmessaging.api.Producer; -import io.openmessaging.api.batch.BatchConsumer; -import io.openmessaging.api.order.OrderConsumer; -import io.openmessaging.api.order.OrderProducer; -import io.openmessaging.api.transaction.LocalTransactionChecker; -import io.openmessaging.api.transaction.TransactionProducer; -import java.util.Properties; + import io.openmessaging.api.Consumer; + import io.openmessaging.api.MessagingAccessPoint; + import io.openmessaging.api.Producer; + import io.openmessaging.api.PullConsumer; + import io.openmessaging.api.batch.BatchConsumer; + import io.openmessaging.api.order.OrderConsumer; + import io.openmessaging.api.order.OrderProducer; + import io.openmessaging.api.transaction.LocalTransactionChecker; + import io.openmessaging.api.transaction.TransactionProducer; + import java.util.Properties; /** * {@link MessagingAccessPoint} is recommended. @@ -34,19 +35,16 @@ public interface ONSFactoryAPI { Producer createProducer(final Properties properties); - Consumer createConsumer(final Properties properties); - BatchConsumer createBatchConsumer(final Properties properties); - OrderProducer createOrderProducer(final Properties properties); - OrderConsumer createOrderedConsumer(final Properties properties); - TransactionProducer createTransactionProducer(final Properties properties, final LocalTransactionChecker checker); + + PullConsumer createPullConsumer(final Properties properties); } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java index 18ac9e4..5cded3a 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java @@ -20,6 +20,7 @@ import io.openmessaging.api.Consumer; import io.openmessaging.api.Message; import io.openmessaging.api.OMS; import io.openmessaging.api.Producer; +import io.openmessaging.api.PullConsumer; import io.openmessaging.api.batch.BatchConsumer; import io.openmessaging.api.order.OrderConsumer; import io.openmessaging.api.order.OrderProducer; @@ -38,6 +39,7 @@ import org.apache.rocketmq.ons.api.impl.rocketmq.ONSUtil; import org.apache.rocketmq.ons.api.impl.rocketmq.OrderConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.OrderProducerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.ProducerImpl; +import org.apache.rocketmq.ons.api.impl.rocketmq.PullConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.TransactionProducerImpl; /** @@ -89,4 +91,9 @@ public class ONSFactoryImpl implements ONSFactoryAPI { } }); } + + @Override + public PullConsumer createPullConsumer(Properties properties) { + return new PullConsumerImpl(properties); + } } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java new file mode 100644 index 0000000..c3a5c4d --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.impl.rocketmq; + +import io.openmessaging.api.Message; +import io.openmessaging.api.PullConsumer; +import io.openmessaging.api.TopicPartition; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyValueConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; +import org.apache.rocketmq.remoting.protocol.LanguageCode; + +public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer { + private final static InternalLogger LOGGER = ClientLoggerUtil.getClientLogger(); + private final static int MAX_CACHED_MESSAGE_SIZE_IN_MIB = 1024; + private final static int MIN_CACHED_MESSAGE_SIZE_IN_MIB = 16; + private final static int MAX_CACHED_MESSAGE_AMOUNT = 50000; + private final static int MIN_CACHED_MESSAGE_AMOUNT = 100; + + private DefaultLitePullConsumer litePullConsumer; + + private final String TOPIC_PARTITION_SPLITER = "#"; + + private int maxCachedMessageSizeInMiB = 512; + + private int maxCachedMessageAmount = 5000; + + public PullConsumerImpl(Properties properties) { + super(properties); + String consumerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.GROUP_ID)); + if (StringUtils.isEmpty(consumerGroup)) { + throw new ONSClientException("Unable to get GROUP_ID property"); + } + + this.litePullConsumer = + new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials)); + + String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); + this.litePullConsumer.setMessageModel(MessageModel.valueOf(messageModel)); + + String maxBatchMessageCount = properties.getProperty(PropertyKeyConst.MAX_BATCH_MESSAGE_COUNT); + if (!UtilAll.isBlank(maxBatchMessageCount)) { + this.litePullConsumer.setPullBatchSize(Integer.valueOf(maxBatchMessageCount)); + } + + boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false")); + this.litePullConsumer.setVipChannelEnabled(isVipChannelEnabled); + if (properties.containsKey(PropertyKeyConst.LANGUAGE_IDENTIFIER)) { + int language = Integer.valueOf(properties.get(PropertyKeyConst.LANGUAGE_IDENTIFIER).toString()); + byte languageByte = (byte) language; + this.litePullConsumer.setLanguage(LanguageCode.valueOf(languageByte)); + } + String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName()); + this.litePullConsumer.setInstanceName(instanceName); + this.litePullConsumer.setNamesrvAddr(this.getNameServerAddr()); + + String consumeThreadNums = properties.getProperty(PropertyKeyConst.ConsumeThreadNums); + if (!UtilAll.isBlank(consumeThreadNums)) { + this.litePullConsumer.setPullThreadNums(Integer.valueOf(consumeThreadNums)); + } + + String configuredCachedMessageAmount = properties.getProperty(PropertyKeyConst.MaxCachedMessageAmount); + if (!UtilAll.isBlank(configuredCachedMessageAmount)) { + maxCachedMessageAmount = Math.min(MAX_CACHED_MESSAGE_AMOUNT, Integer.valueOf(configuredCachedMessageAmount)); + maxCachedMessageAmount = Math.max(MIN_CACHED_MESSAGE_AMOUNT, maxCachedMessageAmount); + this.litePullConsumer.setPullThresholdForAll(maxCachedMessageAmount); + } + + String configuredCachedMessageSizeInMiB = properties.getProperty(PropertyKeyConst.MaxCachedMessageSizeInMiB); + if (!UtilAll.isBlank(configuredCachedMessageSizeInMiB)) { + maxCachedMessageSizeInMiB = Math.min(MAX_CACHED_MESSAGE_SIZE_IN_MIB, Integer.valueOf(configuredCachedMessageSizeInMiB)); + maxCachedMessageSizeInMiB = Math.max(MIN_CACHED_MESSAGE_SIZE_IN_MIB, maxCachedMessageSizeInMiB); + this.litePullConsumer.setPullThresholdSizeForQueue(maxCachedMessageSizeInMiB); + } + +// String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch); +// if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) { +// LOGGER.info("MQ Client Disable the Trace Hook!"); +// } else { +// try { +// Properties tempProperties = new Properties(); +// tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); +// tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); +// tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); +// tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); +// tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); +// tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); +// tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); +// tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name()); +// AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); +// dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl()); +// traceDispatcher = dispatcher; +// this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( +// new OnsConsumeMessageHookImpl(traceDispatcher)); +// } catch (Throwable e) { +// LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace data", e); +// } +// } + + } + + @Override protected void updateNameServerAddr(String nameServerAddresses) { + //TODO + } + + private Set<TopicPartition> convertToTopicPartitions(Collection<MessageQueue> messageQueues) { + Set<TopicPartition> topicPartitions = new HashSet<>(); + for (MessageQueue messageQueue : messageQueues) { + TopicPartition topicPartition = convertToTopicPartition(messageQueue); + topicPartitions.add(topicPartition); + } + return topicPartitions; + } + + private Set<MessageQueue> convertToMessageQueues(Collection<TopicPartition> topicPartitions) { + Set<MessageQueue> messageQueues = new HashSet<>(); + for (TopicPartition topicPartition : topicPartitions) { + messageQueues.add(convertToMessageQueue(topicPartition)); + } + return messageQueues; + } + + private TopicPartition convertToTopicPartition(MessageQueue messageQueue) { + String topic = messageQueue.getTopic(); + String partition = messageQueue.getBrokerName() + TOPIC_PARTITION_SPLITER + messageQueue.getQueueId(); + TopicPartition topicPartition = new TopicPartition(topic, partition); + return topicPartition; + } + + private MessageQueue convertToMessageQueue(TopicPartition topicPartition) { + String topic = topicPartition.getTopic(); + String[] tmp = topicPartition.getPartition().split(TOPIC_PARTITION_SPLITER); + if (tmp.length != 2) { + LOGGER.warn("Failed to get message queue from TopicPartition: {}", topicPartition); + throw new ONSClientException("Failed to get message queue"); + } + String brokerName = tmp[0]; + int queueId = Integer.valueOf(tmp[1]); + return new MessageQueue(topic, brokerName, queueId); + } + + @Override public Set<TopicPartition> topicPartitions(String topic) { + try { + Collection<MessageQueue> messageQueues = litePullConsumer.fetchMessageQueues(topic); + Set<TopicPartition> topicPartitions = new HashSet<>(); + for (MessageQueue messageQueue : messageQueues) { + topicPartitions.add(convertToTopicPartition(messageQueue)); + } + return topicPartitions; + } catch (MQClientException ex) { + throw new ONSClientException("defaultMQPushConsumer subscribe exception", ex); + } + } + + @Override public void assign(Collection<TopicPartition> topicPartitions) { + Set<MessageQueue> messageQueues = new HashSet<>(); + for (TopicPartition topicPartition : topicPartitions) { + messageQueues.add(convertToMessageQueue(topicPartition)); + } + this.litePullConsumer.assign(messageQueues); + } + + @Override public void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback) { + TopicMessageQueueChangeListener listener = new TopicMessageQueueChangeListener() { + @Override public void onChanged(String topic, Set<MessageQueue> messageQueues) { + callback.onChanged(convertToTopicPartitions(messageQueues)); + } + }; + try { + this.litePullConsumer.registerTopicMessageQueueChangeListener(topic, listener); + + } catch (MQClientException ex) { + LOGGER.warn("Register listener error", ex); + throw new ONSClientException("Failed to register topic partition listener"); + } + } + + @Override public List<Message> poll(long timeout) { + List<MessageExt> rmqMsgList = litePullConsumer.poll(timeout); + List<Message> msgList = new ArrayList<Message>(); + for (MessageExt rmqMsg : rmqMsgList) { + Message msg = ONSUtil.msgConvert(rmqMsg); + Map<String, String> propertiesMap = rmqMsg.getProperties(); + msg.setMsgID(rmqMsg.getMsgId()); + if (propertiesMap != null && propertiesMap.get(Constants.TRANSACTION_ID) != null) { + msg.setMsgID(propertiesMap.get(Constants.TRANSACTION_ID)); + } + msgList.add(msg); + } + return msgList; + } + + @Override public void seek(TopicPartition topicPartition, long offset) { + MessageQueue messageQueue = convertToMessageQueue(topicPartition); + try { + litePullConsumer.seek(messageQueue, offset); + } catch (MQClientException ex) { + LOGGER.warn("Topic partition: {} seek to: {} error", topicPartition, offset, ex); + throw new ONSClientException("Seek offset failed"); + } + } + + @Override public void seekToBeginning(TopicPartition topicPartition) { + //TODO + } + + @Override public void seekToEnd(TopicPartition topicPartition) { + //TODO + + } + + @Override public void pause(Collection<TopicPartition> topicPartitions) { + this.litePullConsumer.pause(convertToMessageQueues(topicPartitions)); + } + + @Override public void resume(Collection<TopicPartition> topicPartitions) { + this.litePullConsumer.resume(convertToMessageQueues(topicPartitions)); + } + + @Override public Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp) { + try { + return litePullConsumer.offsetForTimestamp(convertToMessageQueue(topicPartition), timestamp); + } catch (MQClientException ex) { + LOGGER.warn("Get offset for topic partition:{} with timestamp:{} error", topicPartition, timestamp, ex); + throw new ONSClientException("Failed to get offset"); + } + } + + @Override public Long committed(TopicPartition topicPartition) { + try { + return litePullConsumer.committed(convertToMessageQueue(topicPartition)); + } catch (MQClientException ex) { + LOGGER.warn("Get committed offset for topic partition: {} error", topicPartition, ex); + throw new ONSClientException("Failed to get committed offset"); + } + } + + @Override public void commitSync() { + litePullConsumer.commitSync(); + } + + @Override public void start() { + try { + if (this.started.compareAndSet(false, true)) { + this.litePullConsumer.start(); + super.start(); + } + } catch (Exception e) { + throw new ONSClientException(e.getMessage()); + } + } + + @Override public void shutdown() { + if (this.started.compareAndSet(true, false)) { + this.litePullConsumer.shutdown(); + } + super.shutdown(); + } +} diff --git a/ons-core/pom.xml b/ons-core/pom.xml index 7a3da6a..62851ac 100644 --- a/ons-core/pom.xml +++ b/ons-core/pom.xml @@ -44,7 +44,7 @@ <java_target_version>1.8</java_target_version> <file_encoding>UTF-8</file_encoding> <!-- Always use stable version of RocketMQ --> - <rocketmq.version>4.5.2</rocketmq.version> + <rocketmq.version>4.6.0</rocketmq.version> <auth.version>${project.version}</auth.version> <spring.version>4.1.2.RELEASE</spring.version> <diamond.version>3.7.4</diamond.version> diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java new file mode 100644 index 0000000..91bf280 --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimplePullConsumer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.sample.consumer; + +import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.PullConsumer; +import io.openmessaging.api.TopicPartition; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.sample.MQConfig; + +public class SimplePullConsumer { + public static void main(String[] args) { + + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); + + Properties consumerProperties = new Properties(); + consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); + consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); + consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); + + PullConsumer consumer = messagingAccessPoint.createPullConsumer(consumerProperties); + /* + * Alternatively, you can use the ONSFactory to create instance directly. + * <pre> + * {@code + * consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + * OrderConsumer consumer = ONSFactory.createOrderedConsumer(consumerProperties); + * } + * </pre> + */ + + consumer.start(); + Set<TopicPartition> topicPartitions = consumer.topicPartitions(MQConfig.TOPIC); + consumer.assign(topicPartitions); + + while (true){ + List<Message> messages = consumer.poll(3000); + System.out.printf("Received message: %s %n", messages); + consumer.commitSync(); + } + + + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java index 973dc4d..bcfa454 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java @@ -24,7 +24,7 @@ public class LocalTransactionCheckerImpl implements LocalTransactionChecker { @Override public TransactionStatus check(Message msg) { - System.out.printf("Receive transaction check back request, MsgId: %s%n", msg.getMsgID()); + System.out.printf("Receive transaction check back request, MsgId: %s%n", msg); return TransactionStatus.CommitTransaction; } } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java index 44da502..2e41e22 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java @@ -60,8 +60,8 @@ public class SimpleTransactionProducer { SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { - System.out.printf("Execute local transaction and return TransactionStatus. %n"); - return TransactionStatus.CommitTransaction; + System.out.printf("Execute local transaction and return TransactionStatus. %s %n", msg); + return TransactionStatus.Unknow; } }, null); assert sendResult != null; diff --git a/pom.xml b/pom.xml index 71433c0..40f5660 100644 --- a/pom.xml +++ b/pom.xml @@ -45,8 +45,8 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <file_encoding>UTF-8</file_encoding> <!-- Compiler settings properties --> - <maven.compiler.source>1.7</maven.compiler.source> - <maven.compiler.target>1.7</maven.compiler.target> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> <eagleeye.core.version>1.4.8</eagleeye.core.version> </properties> <modules>
