This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new bc32cbd4 Polish example of simple consumer (#351)
bc32cbd4 is described below
commit bc32cbd440f70cb71eb223a3b94c775852740fa9
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Feb 6 11:07:53 2023 +0800
Polish example of simple consumer (#351)
---
.../java/example/AsyncSimpleConsumerExample.java | 60 +++++++++++-----------
.../client/java/example/SimpleConsumerExample.java | 29 ++++++-----
2 files changed, 47 insertions(+), 42 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 7cd462e6..51f3ccf8 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.java.example;
-import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -45,7 +44,8 @@ public class AsyncSimpleConsumerExample {
private AsyncSimpleConsumerExample() {
}
- public static void main(String[] args) throws ClientException,
IOException, InterruptedException {
+ @SuppressWarnings({"resource", "InfiniteLoopStatement"})
+ public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
@@ -81,34 +81,36 @@ public class AsyncSimpleConsumerExample {
ExecutorService receiveCallbackExecutor =
Executors.newCachedThreadPool();
// Set individual thread pool for ack callback.
ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
- final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(maxMessageNum, invisibleDuration);
- future0.whenCompleteAsync(((messages, throwable) -> {
- if (null != throwable) {
- log.error("Failed to receive message from remote", throwable);
- // Return early.
- return;
- }
- log.info("Received {} message(s)", messages.size());
- // Using messageView as key rather than message id because message
id may be duplicated.
- final Map<MessageView, CompletableFuture<Void>> map =
- messages.stream().collect(Collectors.toMap(message -> message,
consumer::ackAsync));
- for (Map.Entry<MessageView, CompletableFuture<Void>> entry :
map.entrySet()) {
- final MessageId messageId = entry.getKey().getMessageId();
- final CompletableFuture<Void> future = entry.getValue();
- future.whenCompleteAsync((v, t) -> {
- if (null != t) {
- log.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
- // Return early.
- return;
- }
- log.info("Message is acknowledged successfully,
messageId={}", messageId);
- }, ackCallbackExecutor);
- }
+ // Receive message.
+ do {
+ final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(maxMessageNum,
+ invisibleDuration);
+ future0.whenCompleteAsync(((messages, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to receive message from remote",
throwable);
+ // Return early.
+ return;
+ }
+ log.info("Received {} message(s)", messages.size());
+ // Using messageView as key rather than message id because
message id may be duplicated.
+ final Map<MessageView, CompletableFuture<Void>> map =
+ messages.stream().collect(Collectors.toMap(message ->
message, consumer::ackAsync));
+ for (Map.Entry<MessageView, CompletableFuture<Void>> entry :
map.entrySet()) {
+ final MessageId messageId = entry.getKey().getMessageId();
+ final CompletableFuture<Void> future = entry.getValue();
+ future.whenCompleteAsync((v, t) -> {
+ if (null != t) {
+ log.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
+ // Return early.
+ return;
+ }
+ log.info("Message is acknowledged successfully,
messageId={}", messageId);
+ }, ackCallbackExecutor);
+ }
- }), receiveCallbackExecutor);
- // Block to avoid exist of background threads.
- Thread.sleep(Long.MAX_VALUE);
+ }), receiveCallbackExecutor);
+ } while (true);
// Close the simple consumer when you don't need it anymore.
- consumer.close();
+ // consumer.close();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index c3f06a1a..d69253c0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.java.example;
-import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -40,7 +39,8 @@ public class SimpleConsumerExample {
private SimpleConsumerExample() {
}
- public static void main(String[] args) throws ClientException, IOException
{
+ @SuppressWarnings({"resource", "InfiniteLoopStatement"})
+ public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
@@ -72,18 +72,21 @@ public class SimpleConsumerExample {
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
- final List<MessageView> messages = consumer.receive(maxMessageNum,
invisibleDuration);
- log.info("Received {} message(s)", messages.size());
- for (MessageView message : messages) {
- final MessageId messageId = message.getMessageId();
- try {
- consumer.ack(message);
- log.info("Message is acknowledged successfully, messageId={}",
messageId);
- } catch (Throwable t) {
- log.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
+ // Receive message, multi-threading is more recommended.
+ do {
+ final List<MessageView> messages = consumer.receive(maxMessageNum,
invisibleDuration);
+ log.info("Received {} message(s)", messages.size());
+ for (MessageView message : messages) {
+ final MessageId messageId = message.getMessageId();
+ try {
+ consumer.ack(message);
+ log.info("Message is acknowledged successfully,
messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
+ }
}
- }
+ } while (true);
// Close the simple consumer when you don't need it anymore.
- consumer.close();
+ // consumer.close();
}
}