This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 6672a68 Add client quickstart example for java (#85)
6672a68 is described below
commit 6672a68de62cafb13e9fabc2c825492397450de5
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Jul 28 21:17:34 2022 +0800
Add client quickstart example for java (#85)
---
.gitignore | 3 -
.../client/java/example/AsyncProducerExample.java | 82 +++++++++++++++++++
.../java/example/AsyncSimpleConsumerExample.java | 95 ++++++++++++++++++++++
.../java/example/ProducerDelayMessageExample.java | 83 +++++++++++++++++++
.../java/example/ProducerFifoMessageExample.java | 81 ++++++++++++++++++
.../java/example/ProducerNormalMessageExample.java | 79 ++++++++++++++++++
.../example/ProducerTransactionMessageExample.java | 94 +++++++++++++++++++++
.../client/java/example/PushConsumerExample.java | 72 ++++++++++++++++
.../client/java/example/SimpleConsumerExample.java | 82 +++++++++++++++++++
9 files changed, 668 insertions(+), 3 deletions(-)
diff --git a/.gitignore b/.gitignore
index ee46af5..68a6ad8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,8 +25,5 @@ target/
dependency-reduced-pom.xml
.flattened-pom.xml
-# Java
-java/client/src/main/java/org/apache/rocketmq/client/java/example/
-
# C#
obj/
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
new file mode 100644
index 0000000..de0f390
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncProducerExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AsyncProducerExample.class);
+
+ private AsyncProducerExample() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException
{
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ String endpoints = "foobar.com:8081";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String topic = "yourTopic";
+ final Producer producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the topic name(s), which is optional. It makes producer
could prefetch the topic route before
+ // message publishing.
+ .setTopics(topic)
+ // May throw {@link ClientException} if the producer is not
initialized.
+ .build();
+ // Define your message body.
+ byte[] body = "This is a normal message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ String tag = "yourMessageTagA";
+ final Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(topic)
+ // Message secondary classifier of message besides topic.
+ .setTag(tag)
+ // Key(s) of the message, another way to mark message besides
message id.
+ .setKeys("yourMessageKey-0e094a5f9d85")
+ .setBody(body)
+ .build();
+ final CompletableFuture<SendReceipt> future =
producer.sendAsync(message);
+ future.whenComplete((sendReceipt, throwable) -> {
+ if (null == throwable) {
+ LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ } else {
+ LOGGER.error("Failed to send message", throwable);
+ }
+ });
+ // Close the producer when you don't need it anymore.
+ producer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
new file mode 100644
index 0000000..2bda9b5
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncSimpleConsumerExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AsyncSimpleConsumerExample.class);
+
+ private AsyncSimpleConsumerExample() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException
{
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ String endpoints = "foobar.com:8081";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String consumerGroup = "yourConsumerGroup";
+ Duration awaitDuration = Duration.ofSeconds(30);
+ String tag = "yourMessageTagA";
+ String topic = "yourTopic";
+ FilterExpression filterExpression = new FilterExpression(tag,
FilterExpressionType.TAG);
+ SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the consumer group name.
+ .setConsumerGroup(consumerGroup)
+ // set await duration for long-polling.
+ .setAwaitDuration(awaitDuration)
+ // Set the subscription for the consumer.
+ .setSubscriptionExpressions(Collections.singletonMap(topic,
filterExpression))
+ .build();
+ // Max message num for each long polling.
+ int maxMessageNum = 16;
+ // Set message invisible duration after it is received.
+ Duration invisibleDuration = Duration.ofSeconds(30);
+ final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(maxMessageNum, invisibleDuration);
+ future0.thenAccept(message -> {
+ final Map<MessageId, CompletableFuture<Void>> map =
+
message.stream().collect(Collectors.toMap(MessageView::getMessageId,
consumer::ackAsync));
+ for (Map.Entry<MessageId, CompletableFuture<Void>> entry :
map.entrySet()) {
+ final MessageId messageId = entry.getKey();
+ final CompletableFuture<Void> future = entry.getValue();
+ future.thenAccept(v -> LOGGER.info("Message is acknowledged
successfully, messageId={}", messageId))
+ .exceptionally(throwable -> {
+ LOGGER.error("Message is failed to be acknowledged,
messageId={}", messageId);
+ return null;
+ });
+ }
+ }).exceptionally(t -> {
+ LOGGER.error("Failed to receive message from remote", t);
+ return null;
+ });
+ // Close the simple consumer when you don't need it anymore.
+ consumer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
new file mode 100644
index 0000000..1fae792
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerDelayMessageExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerDelayMessageExample.class);
+
+ private ProducerDelayMessageExample() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException
{
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ String endpoints = "foobar.com:8081";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String topic = "yourDelayTopic";
+ final Producer producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the topic name(s), which is optional. It makes producer
could prefetch the topic route before
+ // message publishing.
+ .setTopics(topic)
+ // May throw {@link ClientException} if the producer is not
initialized.
+ .build();
+ // Define your message body.a
+ byte[] body = "This is a delay message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ String tag = "yourMessageTagA";
+ Duration messageDelayTime = Duration.ofSeconds(10);
+ final Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(topic)
+ // Message secondary classifier of message besides topic.
+ .setTag(tag)
+ // Key(s) of the message, another way to mark message besides
message id.
+ .setKeys("yourMessageKey-3ee439f945d7")
+ // Set expected delivery timestamp of message.
+ .setDeliveryTimestamp(System.currentTimeMillis() +
messageDelayTime.toMillis())
+ .setBody(body)
+ .build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message);
+ LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ } catch (Throwable t) {
+ LOGGER.error("Failed to send message", t);
+ }
+ // Close the producer when you don't need it anymore.
+ producer.close();
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
new file mode 100644
index 0000000..e087e46
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerFifoMessageExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerFifoMessageExample.class);
+
+ private ProducerFifoMessageExample() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException
{
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ String endpoints = "foobar.com:8081";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String topic = "yourFifoTopic";
+ final Producer producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the topic name(s), which is optional. It makes producer
could prefetch the topic route before
+ // message publishing.
+ .setTopics(topic)
+ // May throw {@link ClientException} if the producer is not
initialized.
+ .build();
+ // Define your message body.
+ byte[] body = "This is a FIFO message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ String tag = "yourMessageTagA";
+ final Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(topic)
+ // Message secondary classifier of message besides topic.
+ .setTag(tag)
+ // Key(s) of the message, another way to mark message besides
message id.
+ .setKeys("yourMessageKey-1ff69ada8e0e")
+ // Message group decides the message delivery order.
+ .setMessageGroup("youMessageGroup0")
+ .setBody(body)
+ .build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message);
+ LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ } catch (Throwable t) {
+ LOGGER.error("Failed to send message", t);
+ }
+ // Close the producer when you don't need it anymore.
+ producer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
new file mode 100644
index 0000000..c28800e
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerNormalMessageExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerNormalMessageExample.class);
+
+ private ProducerNormalMessageExample() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException
{
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ String endpoints = "foobar.com:8081";
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String topic = "yourNormalTopic";
+ final Producer producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the topic name(s), which is optional. It makes producer
could prefetch the topic route before
+ // message publishing.
+ .setTopics(topic)
+ // May throw {@link ClientException} if the producer is not
initialized.
+ .build();
+ // Define your message body.
+ byte[] body = "This is a normal message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ String tag = "yourMessageTagA";
+ final Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(topic)
+ // Message secondary classifier of message besides topic.
+ .setTag(tag)
+ // Key(s) of the message, another way to mark message besides
message id.
+ .setKeys("yourMessageKey-1c151062f96e")
+ .setBody(body)
+ .build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message);
+ LOGGER.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ } catch (Throwable t) {
+ LOGGER.error("Failed to send message", t);
+ }
+ // Close the producer when you don't need it anymore.
+ producer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
new file mode 100644
index 0000000..ba71357
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.apis.producer.Transaction;
+import org.apache.rocketmq.client.apis.producer.TransactionChecker;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerTransactionMessageExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerTransactionMessageExample.class);
+
+ private ProducerTransactionMessageExample() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException
{
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ String endpoints = "foobar.com:8081";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String topic = "yourTransactionTopic";
+ TransactionChecker checker = messageView -> {
+ LOGGER.info("Receive transactional message check, message={}",
messageView);
+ // Return the transaction resolution according to your business
logic.
+ return TransactionResolution.COMMIT;
+ };
+ Producer producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the topic name(s), which is optional. It makes producer
could prefetch the topic route before
+ // message publishing.
+ .setTopics(topic)
+ // Set transactional checker.
+ .setTransactionChecker(checker)
+ .build();
+ final Transaction transaction = producer.beginTransaction();
+ // Define your message body.
+ byte[] body = "This is a transaction message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ String tag = "yourMessageTagA";
+ final Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(topic)
+ // Message secondary classifier of message besides topic.
+ .setTag(tag)
+ // Key(s) of the message, another way to mark message besides
message id.
+ .setKeys("yourMessageKey-565ef26f5727")
+ .setBody(body)
+ .build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message,
transaction);
+ LOGGER.info("Send transaction message successfully, messageId={}",
sendReceipt.getMessageId());
+ } catch (Throwable t) {
+ LOGGER.error("Failed to send message", t);
+ return;
+ }
+ // Commit the transaction.
+ transaction.commit();
+ // Or rollback the transaction.
+ // transaction.rollback();
+ // Close the producer when you don't need it anymore.
+ producer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
new file mode 100644
index 0000000..79bfd3c
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PushConsumerExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PushConsumerExample.class);
+
+ private PushConsumerExample() {
+ }
+
+ public static void main(String[] args) throws ClientException,
IOException, InterruptedException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ String endpoints = "foobar.com:8081";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String tag = "yourMessageTagA";
+ FilterExpression filterExpression = new FilterExpression(tag,
FilterExpressionType.TAG);
+ String consumerGroup = "yourConsumerGroup";
+ String topic = "yourTopic";
+ PushConsumer pushConsumer = provider.newPushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the consumer group name.
+ .setConsumerGroup(consumerGroup)
+ // Set the subscription for the consumer.
+ .setSubscriptionExpressions(Collections.singletonMap(topic,
filterExpression))
+ .setMessageListener(messageView -> {
+ // Handle the received message and return consume result.
+ LOGGER.info("Consume message={}", messageView);
+ return ConsumeResult.SUCCESS;
+ })
+ .build();
+ // Block the main thread, no need for production environment.
+ Thread.sleep(Long.MAX_VALUE);
+ // Close the push consumer when you don't need it anymore.
+ pushConsumer.close();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
new file mode 100644
index 0000000..afbef23
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleConsumerExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleConsumerExample.class);
+
+ private SimpleConsumerExample() {
+ }
+
+ public static void main(String[] args) throws ClientException, IOException
{
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ String accessKey = "yourAccessKey";
+ String secretKey = "yourSecretKey";
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(accessKey, secretKey);
+ String endpoints = "foobar.com:8081";
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String consumerGroup = "yourConsumerGroup";
+ Duration awaitDuration = Duration.ofSeconds(30);
+ String tag = "yourMessageTagA";
+ String topic = "yourTopic";
+ FilterExpression filterExpression = new FilterExpression(tag,
FilterExpressionType.TAG);
+ SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the consumer group name.
+ .setConsumerGroup(consumerGroup)
+ // set await duration for long-polling.
+ .setAwaitDuration(awaitDuration)
+ // Set the subscription for the consumer.
+ .setSubscriptionExpressions(Collections.singletonMap(topic,
filterExpression))
+ .build();
+ // Max message num for each long polling.
+ int maxMessageNum = 16;
+ // Set message invisible duration after it is received.
+ Duration invisibleDuration = Duration.ofSeconds(30);
+ final List<MessageView> messages = consumer.receive(maxMessageNum,
invisibleDuration);
+ for (MessageView message : messages) {
+ try {
+ consumer.ack(message);
+ } catch (Throwable t) {
+ LOGGER.error("Failed to acknowledge message, messageId={}",
message.getMessageId(), t);
+ }
+ }
+ // Close the simple consumer when you don't need it anymore.
+ consumer.close();
+ }
+}