3pacccccc commented on code in PR #24663:
URL: https://github.com/apache/pulsar/pull/24663#discussion_r2298595834


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java:
##########
@@ -1548,4 +1548,50 @@ public void 
testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws Exception
         consumer.close();
 
     }
+
+    // reproduce issue reported in 
https://github.com/apache/pulsar/issues/24541
+    @Test
+    public void sendDeadLetterTopicWithMismatchSchemaProducer() throws 
Exception {
+        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+        admin.namespaces().createNamespace(namespace);
+        // don't enforce schema validation
+        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        // set schema compatibility strategy to always compatible
+        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+                + "/sendDeadLetterTopicWithMismatchSchemaProducer");
+
+        // create topics
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createNonPartitionedTopic(topic + "-DLQ");
+        admin.topics().createNonPartitionedTopic(topic + "-RETRY");
+
+        final int maxRedeliverCount = 1;
+        final String subscriptionName = "my-subscription";
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.AVRO(String.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subscriptionName)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .deadLetterTopic(topic + "-DLQ")
+                        .retryLetterTopic(topic + "-RETRY")
+                        .maxRedeliverCount(maxRedeliverCount)
+                        .build())
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .messageListener((Consumer::negativeAcknowledge))
+                .subscribe();
+
+        Producer<Long> producer = 
pulsarClient.newProducer(Schema.AVRO(Long.class)).topic(topic).create();
+        producer.send(1234567890L);
+
+        Thread.sleep(3000L);
+
+        assertThat(pulsar.getBrokerService().getTopicReference(topic).get()
+                
.getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter())
+                .describedAs("redeliver count of topic %s should be less than 
or equal to 2 because of mismatch schema",
+                        topic)
+                .isLessThanOrEqualTo(maxRedeliverCount + 1);
+        producer.close();
+        consumer.close();

Review Comment:
   > it would be useful to test that it's possible to read the sent message 
from the DLQ using `Schema.AUTO_CONSUME` since it would be the way to handle 
such DLQs where there could be messages from different schemas in the same 
topic.
   
   OK, I'll add test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to