MarvinCai commented on a change in pull request #9163:
URL: https://github.com/apache/pulsar/pull/9163#discussion_r561365524



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -320,26 +320,31 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
         }
 
         if (conf.getDeadLetterPolicy() != null) {
-            possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>();
-            if 
(StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
-                this.deadLetterPolicy = DeadLetterPolicy.builder()
-                        
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
-                        
.deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic())
-                        .build();
+            // DLQ only supports non-ordered subscriptions, don't enable DLQ 
on Key_Shared subType since it require message ordering for given key.
+            if (conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
+                possibleSendToDeadLetterTopicMessages = new 
ConcurrentHashMap<>();
             } else {
-                this.deadLetterPolicy = DeadLetterPolicy.builder()
-                        
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
-                        .deadLetterTopic(String.format("%s-%s" + 
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, topic, subscription))
-                        .build();
+                possibleSendToDeadLetterTopicMessages = null;
+            }
+            DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = 
DeadLetterPolicy.builder()
+                    
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount());
+
+
+            if 
(StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
+                
dlpBuilder.deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic());
+            } else if (conf.getSubscriptionType() != 
SubscriptionType.Key_Shared) {
+                // Not setting a default DLQ is it's Key_Shared subType.
+                dlpBuilder.deadLetterTopic(String.format("%s-%s" + 
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+                        topic, subscription));
             }
 
             if 
(StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
-                
this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                
dlpBuilder.retryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
             } else {
-                
this.deadLetterPolicy.setRetryLetterTopic(String.format("%s-%s" + 
RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX,
+                dlpBuilder.retryLetterTopic(String.format("%s-%s" + 
RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX,
                         topic, subscription));
             }
-
+            this.deadLetterPolicy = dlpBuilder.build();

Review comment:
       actually taking that back, the constructor of ZeroQueueConsumerImpl is 
public and it'll call constructor of ConsumerImpl. So user can potentially set 
DLQ on Key_Shared sub-type without knowing we disabled that. Will also throw an 
exception here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to