lhotari commented on code in PR #24663:
URL: https://github.com/apache/pulsar/pull/24663#discussion_r2298466547


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2365,9 +2367,15 @@ private CompletableFuture<Boolean> 
processPossibleToDLQ(MessageIdAdv messageId)
                                     return null;
                                 });
                     } catch (Exception e) {
-                        log.warn("[{}] [{}] [{}] Failed to send DLQ message to 
{} for message id {}",
+                        log.warn("[{}] [{}] [{}] Failed to process DLQ message 
to {} for message id {}",
                                 topicName, subscription, consumerName, 
deadLetterPolicy.getDeadLetterTopic(), messageId,
                                 e);
+                        if (e instanceof SchemaSerializationException) {
+                            // If a SchemaSerializationException occurs, the 
message should be ignored to redelivery.
+                            // Otherwise, it will be redelivered and still 
can't send to the DLQ
+                            // and cause an infinite loop
+                            result.complete(true);

Review Comment:
   > shall I simply remove the schema validation in `AutoProduceBytesSchema` 
directly? My concern is that this might impact the current behavior for 
existing users.
   
   No, we shouldn't change the current behavior. We need another way to be able 
to configure it.
   A simple solution would be to add a setter for `requireSchemaValidation` in 
`AutoProduceBytesSchema` class. 
   
   Then it would be possible to disable validation this way in 
`org.apache.pulsar.client.impl.ConsumerImpl.initDeadLetterProducerIfNeeded` 
method:
   ```
                           AutoProduceBytesSchema<byte[]> autoProduceBytes =
                                   (AutoProduceBytesSchema) 
Schema.AUTO_PRODUCE_BYTES(schema);
                           autoProduceBytes.setRequireSchemaValidation(false);
                           ProducerBuilder<byte[]> builder =
                                   ((ProducerBuilderImpl<byte[]>) 
client.newProducer(autoProduceBytes))
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to