RobertIndie commented on code in PR #18107:
URL: https://github.com/apache/pulsar/pull/18107#discussion_r1000071857
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java:
##########
@@ -2408,6 +2409,59 @@ public void testMaxProducersPerTopicUnlimited() throws
Exception {
}
}
+
+ @Test
+ public void testAcknowledgmentGroupSize() throws Exception {
+ final String namespace = "prop-xyz/ns2";
+ final String topicName = "persistent://" + namespace + "/topic1";
+ admin.namespaces().createNamespace(namespace, Set.of("test"));
+ int acknowledgmentGroupSize = 6;
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-sub")
+ .acknowledgmentGroupTime(10000, TimeUnit.SECONDS)
+ .maxAcknowledgmentGroupSize(acknowledgmentGroupSize)
+ .subscribe();
+
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
managedLedger.getCursors().iterator().next();
+
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ MessageIdImpl ackMessageId = new MessageIdImpl(-1, -1, -1);
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ if (msg != null) {
+ MessageId messageId = msg.getMessageId();
+ consumer.acknowledge(msg);
+ // When the acknowledgmentGroupSize message is confirmed, send
ack will be triggered
+ if (i == (acknowledgmentGroupSize - 1)) {
+ ackMessageId = (MessageIdImpl) messageId;
+ }
+ }
+ }
+
+ Thread.sleep(5000);
+ Position markDeletedPosition = cursor.getMarkDeletedPosition();
+ long ledgerId = markDeletedPosition.getLedgerId();
+ long entryId = markDeletedPosition.getEntryId();
+
+ assertEquals(ledgerId, ackMessageId.getLedgerId());
+ assertEquals(entryId, ackMessageId.getEntryId());
Review Comment:
It's better to use Awaitility here.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java:
##########
@@ -2408,6 +2409,59 @@ public void testMaxProducersPerTopicUnlimited() throws
Exception {
}
}
+
+ @Test
+ public void testAcknowledgmentGroupSize() throws Exception {
Review Comment:
It does not seem to be related to admin. Can we move this test to the
consumer related test?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java:
##########
@@ -327,6 +327,13 @@ public ConsumerBuilder<T> acknowledgmentGroupTime(long
delay, TimeUnit unit) {
return this;
}
+ @Override
+ public ConsumerBuilder<T> maxAcknowledgmentGroupSize(int messageNum) {
+ checkArgument(messageNum > 0, "acknowledgementsGroupSize needs to be >
0");
+ conf.setAcknowledgementsGroupSize(messageNum);
+ return this;
+ }
Review Comment:
We also need to add a test for the invalid configuration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]