lhotari commented on code in PR #24663:
URL: https://github.com/apache/pulsar/pull/24663#discussion_r2298584661


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java:
##########
@@ -1548,4 +1548,50 @@ public void 
testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws Exception
         consumer.close();
 
     }
+
+    // reproduce issue reported in 
https://github.com/apache/pulsar/issues/24541
+    @Test
+    public void sendDeadLetterTopicWithMismatchSchemaProducer() throws 
Exception {
+        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+        admin.namespaces().createNamespace(namespace);
+        // don't enforce schema validation
+        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        // set schema compatibility strategy to always compatible
+        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+                + "/sendDeadLetterTopicWithMismatchSchemaProducer");
+
+        // create topics
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createNonPartitionedTopic(topic + "-DLQ");
+        admin.topics().createNonPartitionedTopic(topic + "-RETRY");
+
+        final int maxRedeliverCount = 1;
+        final String subscriptionName = "my-subscription";
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.AVRO(String.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subscriptionName)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .deadLetterTopic(topic + "-DLQ")
+                        .retryLetterTopic(topic + "-RETRY")
+                        .maxRedeliverCount(maxRedeliverCount)
+                        .build())
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .messageListener((Consumer::negativeAcknowledge))
+                .subscribe();
+
+        Producer<Long> producer = 
pulsarClient.newProducer(Schema.AVRO(Long.class)).topic(topic).create();
+        producer.send(1234567890L);
+
+        Thread.sleep(3000L);
+
+        assertThat(pulsar.getBrokerService().getTopicReference(topic).get()
+                
.getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter())
+                .describedAs("redeliver count of topic %s should be less than 
or equal to 2 because of mismatch schema",
+                        topic)
+                .isLessThanOrEqualTo(maxRedeliverCount + 1);
+        producer.close();
+        consumer.close();

Review Comment:
   At the end, it would be useful to test that it's possible to read the sent 
message from the DLQ using `Schema.AUTO_CONSUME` since it would be the way to 
handle such DLQs where there could be messages from different schemas in the 
same topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to