This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit 316399b4cadf630b7e6634c380c529de613bbd6f Author: duhenglucky <[email protected]> AuthorDate: Mon Jul 1 16:45:52 2019 +0800 Add rocketmq-ons demo --- ons-sample/pom.xml | 27 +++++++++++ .../org/apache/rocketmq/ons/sample/MQConfig.java | 32 +++++++++++++ .../ons/sample/consumer/MessageListenerImpl.java | 27 +++++++++++ .../ons/sample/consumer/SimpleMQConsumer.java | 45 ++++++++++++++++++ .../ons/sample/consumer/SimpleOrderConsumer.java | 54 ++++++++++++++++++++++ .../producer/LocalTransactionCheckerImpl.java | 15 ++++++ .../ons/sample/producer/MQTimerProducer.java | 50 ++++++++++++++++++++ .../ons/sample/producer/SimpleMQProducer.java | 54 ++++++++++++++++++++++ .../ons/sample/producer/SimpleOrderProducer.java | 52 +++++++++++++++++++++ .../sample/producer/SimpleTransactionProducer.java | 47 +++++++++++++++++++ pom.xml | 1 + 11 files changed, 404 insertions(+) diff --git a/ons-sample/pom.xml b/ons-sample/pom.xml new file mode 100644 index 0000000..71a7972 --- /dev/null +++ b/ons-sample/pom.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ons-parent</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>1.8.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>ons-sample</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>ons-api</artifactId> + <version>1.8.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>ons-client</artifactId> + <version>1.8.1-SNAPSHOT</version> + </dependency> + </dependencies> + + +</project> \ No newline at end of file diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java new file mode 100644 index 0000000..17937d4 --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.rocketmq.ons.sample; + +public class MQConfig { + /** + * RocketMQ config + */ + public static final String TOPIC = "xxxx"; + public static final String GROUP_ID = "GID-xxxx"; + public static final String ORDER_TOPIC = "xxxx"; + public static final String ORDER_GROUP_ID = "GID-xxxx"; + public static final String ACCESS_KEY = "xxxx"; + public static final String SECRET_KEY = "xxxxxxxxxxx"; + public static final String TAG = "*"; + + /** + * NAMESRV_ADDR + */ + public static final String NAMESRV_ADDR = "xxxxxx"; + +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/MessageListenerImpl.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/MessageListenerImpl.java new file mode 100644 index 0000000..f51d16f --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/MessageListenerImpl.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.rocketmq.ons.sample.consumer; + +import java.util.Date; +import org.apache.rocketmq.ons.api.Action; +import org.apache.rocketmq.ons.api.ConsumeContext; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.MessageListener; + +public class MessageListenerImpl implements MessageListener { + @Override + public Action consume(Message message, ConsumeContext consumeContext) { + System.out.println(new Date() + " Receive message, Topic is:" + message.getTopic() + ", MsgId is:" + message.getMsgID()); + return Action.CommitMessage; + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java new file mode 100644 index 0000000..4c4f58b --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.sample.consumer; + + +import java.util.Properties; +import org.apache.rocketmq.ons.api.Consumer; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.sample.MQConfig; + +public class SimpleMQConsumer { + + public static void main(String[] args) { + Properties consumerProperties = new Properties(); + consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); + consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); + consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); + consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + Consumer consumer = ONSFactory.createConsumer(consumerProperties); + consumer.subscribe(MQConfig.TOPIC, MQConfig.TAG, new MessageListenerImpl()); + consumer.start(); + System.out.println("Consumer start success."); + + //等待固定时间防止进程退出 + try { + Thread.sleep(200000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java new file mode 100644 index 0000000..ea5159f --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.sample.consumer; + +import java.util.Properties; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.order.ConsumeOrderContext; +import org.apache.rocketmq.ons.api.order.MessageOrderListener; +import org.apache.rocketmq.ons.api.order.OrderAction; +import org.apache.rocketmq.ons.api.order.OrderConsumer; +import org.apache.rocketmq.ons.sample.MQConfig; + +public class SimpleOrderConsumer { + + public static void main(String[] args) { + Properties consumerProperties = new Properties(); + consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.ORDER_GROUP_ID); + consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); + consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); + consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + OrderConsumer consumer = ONSFactory.createOrderedConsumer(consumerProperties); + consumer.subscribe(MQConfig.ORDER_TOPIC, MQConfig.TAG, new MessageOrderListener() { + + @Override + public OrderAction consume(final Message message, final ConsumeOrderContext context) { + System.out.println(message); + return OrderAction.Success; + } + }); + consumer.start(); + System.out.println("Consumer start success."); + + try { + Thread.sleep(200000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java new file mode 100644 index 0000000..32a756f --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java @@ -0,0 +1,15 @@ +package org.apache.rocketmq.ons.sample.producer; + +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker; +import org.apache.rocketmq.ons.api.transaction.TransactionStatus; + + +public class LocalTransactionCheckerImpl implements LocalTransactionChecker { + + @Override + public TransactionStatus check(Message msg) { + System.out.println("Receive transaction check back request, MsgId: " + msg.getMsgID()); + return TransactionStatus.CommitTransaction; + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java new file mode 100644 index 0000000..1f70ad3 --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.rocketmq.ons.sample.producer; + +import java.util.Date; +import java.util.Properties; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.Producer; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.sample.MQConfig; + +public class MQTimerProducer { + public static void main(String[] args) { + Properties producerProperties = new Properties(); + producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); + producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); + producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); + producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + Producer producer = ONSFactory.createProducer(producerProperties); + producer.start(); + System.out.println("Producer Started"); + + for (int i = 0; i < 10; i++) { + Message message = new Message(MQConfig.TOPIC, MQConfig.TAG, "MQ send timer message test".getBytes()); + long delayTime = 3000; + message.setStartDeliverTime(System.currentTimeMillis() + delayTime); + try { + SendResult sendResult = producer.send(message); + assert sendResult != null; + System.out.println(new Date() + " Send mq timer message success! Topic is: " + MQConfig.TOPIC + " msgId is: " + sendResult.getMessageId()); + } catch (ONSClientException e) { + System.out.println(new Date() + " Send mq message failed. Topic is:" + MQConfig.TOPIC); + e.printStackTrace(); + } + } + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java new file mode 100644 index 0000000..e397a00 --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.sample.producer; + + +import java.util.Date; +import java.util.Properties; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.Producer; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.sample.MQConfig; + +public class SimpleMQProducer { + + + public static void main(String[] args) { + Properties producerProperties = new Properties(); + producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); + producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); + producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); + producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + Producer producer = ONSFactory.createProducer(producerProperties); + producer.start(); + System.out.println("Producer Started"); + + for (int i = 0; i < 10; i++) { + Message message = new Message(MQConfig.TOPIC, MQConfig.TAG, "mq send transaction message test".getBytes()); + try { + SendResult sendResult = producer.send(message); + assert sendResult != null; + System.out.println(new Date() + " Send mq message success! Topic is: " + MQConfig.TOPIC + " msgId is: " + sendResult.getMessageId()); + } catch (ONSClientException e) { + System.out.println(new Date() + " Send mq message failed! Topic is: " + MQConfig.TOPIC); + e.printStackTrace(); + } + } + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java new file mode 100644 index 0000000..550005c --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2010-2016 Alibaba Group Holding Limited + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.rocketmq.ons.sample.producer; + +import java.util.Date; +import java.util.Properties; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.order.OrderProducer; +import org.apache.rocketmq.ons.sample.MQConfig; + +public class SimpleOrderProducer { + + public static void main(String[] args) { + Properties producerProperties = new Properties(); + producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.ORDER_GROUP_ID); + producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); + producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); + producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + OrderProducer producer = ONSFactory.createOrderProducer(producerProperties); + producer.start(); + System.out.println("Producer Started"); + + for (int i = 0; i < 10; i++) { + Message msg = new Message(MQConfig.ORDER_TOPIC, MQConfig.TAG, "MQ send order message test".getBytes()); + String orderId = "biz_" + i % 10; + msg.setKey(orderId); + String shardingKey = String.valueOf(orderId); + try { + SendResult sendResult = producer.send(msg, shardingKey); + assert sendResult != null; + System.out.println(new Date() + " Send mq message success! Topic is: " + MQConfig.ORDER_TOPIC + " msgId is: " + sendResult.getMessageId()); + } catch (ONSClientException e) { + System.out.println(new Date() + " Send mq message failed! Topic is: " + MQConfig.TOPIC); + e.printStackTrace(); + } + } + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java new file mode 100644 index 0000000..bbb97e3 --- /dev/null +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java @@ -0,0 +1,47 @@ +package org.apache.rocketmq.ons.sample.producer; + +import java.util.Date; +import java.util.Properties; +import org.apache.rocketmq.ons.api.Message; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.SendResult; +import org.apache.rocketmq.ons.api.exception.ONSClientException; +import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter; +import org.apache.rocketmq.ons.api.transaction.TransactionProducer; +import org.apache.rocketmq.ons.api.transaction.TransactionStatus; +import org.apache.rocketmq.ons.sample.MQConfig; + +public class SimpleTransactionProducer { + + public static void main(String[] args) { + Properties tranProducerProperties = new Properties(); + tranProducerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); + tranProducerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); + tranProducerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); + tranProducerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl(); + TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(tranProducerProperties, localTransactionChecker); + transactionProducer.start(); + + Message message = new Message(MQConfig.TOPIC, MQConfig.TAG, "MQ send transaction message test".getBytes()); + + for (int i = 0; i < 10; i++) { + try { + SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecuter() { + @Override + public TransactionStatus execute(Message msg, Object arg) { + System.out.println("Execute local transaction and return TransactionStatus."); + return TransactionStatus.CommitTransaction; + } + }, null); + assert sendResult != null; + } catch (ONSClientException e) { + System.out.println(new Date() + " Send mq message failed! Topic is:" + MQConfig.TOPIC); + e.printStackTrace(); + } + } + + System.out.println("Send transaction message success."); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9718937..59fdb13 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ </properties> <modules> <module>ons-core</module> + <module>ons-sample</module> </modules> <build> <plugins>
