Repository: incubator-rocketmq-site Updated Branches: refs/heads/master 4bdf27165 -> ca53fd770
Add openmessaging example. Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/ca53fd77 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/ca53fd77 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/ca53fd77 Branch: refs/heads/master Commit: ca53fd7705698457b969e08158724895d7c6f450 Parents: 4bdf271 Author: yukon <[email protected]> Authored: Thu Jun 15 10:56:23 2017 +0800 Committer: yukon <[email protected]> Committed: Thu Jun 15 10:59:21 2017 +0800 ---------------------------------------------------------------------- _data/navigation.yml | 2 + _docs/21-openmessaging-example.md | 131 +++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/ca53fd77/_data/navigation.yml ---------------------------------------------------------------------- diff --git a/_data/navigation.yml b/_data/navigation.yml index 7385b8c..238b5df 100644 --- a/_data/navigation.yml +++ b/_data/navigation.yml @@ -37,6 +37,8 @@ docs: url: /docs/filter-by-sql92-example/ - title: "Logappender Example" url: /docs/logappender-example/ + - title: "OpenMessaging Example" + url: /docs/openmessaging-example/ - title: "FAQ" url: /docs/faq/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/ca53fd77/_docs/21-openmessaging-example.md ---------------------------------------------------------------------- diff --git a/_docs/21-openmessaging-example.md b/_docs/21-openmessaging-example.md new file mode 100644 index 0000000..ff7e6c1 --- /dev/null +++ b/_docs/21-openmessaging-example.md @@ -0,0 +1,131 @@ +--- +title: "OpenMessaging Example" +permalink: /docs/openmessaging-example/ +excerpt: "How to use OpenMessaging in RocketMQ." +modified: 2017-06-08T21:01:43-04:00 +--- + +{% include toc %} + +[OpenMessaging](https://openmessaging.github.io/), which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, e-commerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems. + +RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging. + +#### OMSProducer + +The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions. + +```java +public class OMSProducer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final Producer producer = messagingAccessPoint.createProducer(); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + producer.startup(); + System.out.printf("Producer startup OK%n"); + + { + Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + SendResult sendResult = producer.send(message); + System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId()); + } + + { + final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new PromiseListener<SendResult>() { + @Override + public void operationCompleted(Promise<SendResult> promise) { + System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); + } + + @Override + public void operationFailed(Promise<SendResult> promise) { + System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); + } + }); + } + + { + producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + System.out.printf("Send oneway message OK%n"); + } + + producer.shutdown(); + messagingAccessPoint.shutdown(); + } +} +``` + +#### OMSPullConsumer + +Use OMS PullConsumer to poll messages from a specified queue. + +```java +public class OMSPullConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + consumer.startup(); + System.out.printf("Consumer startup OK%n"); + + Message message = consumer.poll(); + if (message != null) { + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.printf("Received one message: %s%n", msgId); + consumer.ack(msgId); + } + + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } +} +``` + +#### OMSPushConsumer + +Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener + +```java +public class OMSPushConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PushConsumer consumer = messagingAccessPoint. + createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); + context.ack(); + } + }); + + } +} +``` +
