This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 934949c Refine the simple consumer example (#193)
934949c is described below
commit 934949c4186a5fde2eb27a9125ca8e4fe3cd38ca
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Aug 28 14:17:28 2022 +0800
Refine the simple consumer example (#193)
---
.../rocketmq/client/java/example/AsyncSimpleConsumerExample.java | 7 ++++---
.../rocketmq/client/java/example/SimpleConsumerExample.java | 8 ++++++--
2 files changed, 10 insertions(+), 5 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 3cabaa4..42da480 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -74,11 +74,12 @@ public class AsyncSimpleConsumerExample {
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
- Duration invisibleDuration = Duration.ofSeconds(5);
+ Duration invisibleDuration = Duration.ofSeconds(15);
final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(maxMessageNum, invisibleDuration);
- future0.thenAccept(message -> {
+ future0.thenAccept(messages -> {
+ LOGGER.info("Received {} message(s)", messages.size());
final Map<MessageId, CompletableFuture<Void>> map =
-
message.stream().collect(Collectors.toMap(MessageView::getMessageId,
consumer::ackAsync));
+
messages.stream().collect(Collectors.toMap(MessageView::getMessageId,
consumer::ackAsync));
for (Map.Entry<MessageId, CompletableFuture<Void>> entry :
map.entrySet()) {
final MessageId messageId = entry.getKey();
final CompletableFuture<Void> future = entry.getValue();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index 63a035a..a42d4c3 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -29,6 +29,7 @@ import
org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,13 +71,16 @@ public class SimpleConsumerExample {
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
- Duration invisibleDuration = Duration.ofSeconds(5);
+ Duration invisibleDuration = Duration.ofSeconds(15);
final List<MessageView> messages = consumer.receive(maxMessageNum,
invisibleDuration);
+ LOGGER.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
+ final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
+ LOGGER.info("Message is acknowledged successfully,
messageId={}", messageId);
} catch (Throwable t) {
- LOGGER.error("Failed to acknowledge message, messageId={}",
message.getMessageId(), t);
+ LOGGER.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
}
}
// Close the simple consumer when you don't need it anymore.