congbobo184 commented on code in PR #19051:
URL: https://github.com/apache/pulsar/pull/19051#discussion_r1056924662
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -128,6 +133,100 @@ public void testRetryTopic() throws Exception {
checkConsumer.close();
}
+ @Data
+ public static class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ }
+
+ @Data
+ public static class FooV2 {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ @Nullable
+ private String field3;
+ }
+
+ @Test(timeOut = 20000)
+ public void testAutoConsumeSchemaRetryLetter() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/retry-letter-topic";
+ final String subName = "my-subscription";
+ final int maxRedeliveryCount = 1;
+ final int sendMessages = 10;
+
+ admin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
+ Consumer<FooV2> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.AVRO(FooV2.class))
+ .topic(topic + "-" + subName + "-DLQ")
+ .subscriptionName("my-subscription")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic(topic)
+ .create();
+ Set<String> messageIds = new HashSet<>();
+ for (int i = 0; i < sendMessages; i++) {
+ if (i % 2 == 0) {
+ Foo foo = new Foo();
+ foo.field1 = i + "";
+ foo.field2 = i + "";
+
messageIds.add(producer.newMessage(Schema.AVRO(Foo.class)).value(foo).send().toString());
+ } else {
+ FooV2 foo = new FooV2();
+ foo.field1 = i + "";
+ foo.field2 = i + "";
+ foo.field3 = i + "";
+
messageIds.add(producer.newMessage(Schema.AVRO(FooV2.class)).value(foo).send().toString());
+ }
+ }
+ Consumer<GenericRecord> consumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .enableRetry(true)
+ .receiverQueueSize(100)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
Review Comment:
we don't need to test dead letter right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]