Lanayx commented on a change in pull request #6449:
URL: https://github.com/apache/pulsar/pull/6449#discussion_r455394658



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -511,6 +527,129 @@ boolean markAckForBatchMessage(BatchMessageIdImpl 
batchMessageId, AckType ackTyp
         return sendAcknowledge(messageId, ackType, properties, txnImpl);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    protected CompletableFuture<Void> doReconsumeLater(Message<?> message, 
AckType ackType,
+                                                       Map<String,Long> 
properties, 
+                                                       long delayTime,
+                                                       TimeUnit unit) {
+        MessageId messageId = message.getMessageId();
+        if(messageId instanceof TopicMessageIdImpl) {
+            messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
+        }
+        checkArgument(messageId instanceof MessageIdImpl);
+        if (getState() != State.Ready && getState() != State.Connecting) {
+            stats.incrementNumAcksFailed();
+            PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageId, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageId, exception);
+            }
+            return FutureUtil.failedFuture(exception);
+        }
+        if (delayTime < 0) {
+            delayTime = 0;
+        }
+        if (retryLetterProducer == null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (retryLetterProducer == null) {
+                    retryLetterProducer = client.newProducer(schema)
+                            .topic(this.deadLetterPolicy.getRetryLetterTopic())
+                            .enableBatching(false)
+                            .blockIfQueueFull(false)
+                            .create();
+                }
+            } catch (Exception e) {
+                log.error("Create retry letter producer exception with topic: 
{}", deadLetterPolicy.getRetryLetterTopic(), e);
+            } finally {
+                createProducerLock.writeLock().unlock();
+            }
+        }
+        if (retryLetterProducer != null) {
+            try {
+                MessageImpl<T> retryMessage = null;
+                String originMessageIdStr = null;
+                String originTopicNameStr = null;
+                if (message instanceof TopicMessageImpl) {
+                    retryMessage = (MessageImpl<T>) ((TopicMessageImpl<T>) 
message).getMessage();
+                    originMessageIdStr = ((TopicMessageIdImpl) 
message.getMessageId()).getInnerMessageId().toString();
+                    originTopicNameStr = ((TopicMessageIdImpl) 
message.getMessageId()).getTopicName();
+                } else if (message instanceof MessageImpl) {
+                    retryMessage = (MessageImpl<T>) message;
+                    originMessageIdStr = ((MessageImpl<T>) 
message).getMessageId().toString();
+                    originTopicNameStr =  ((MessageImpl<T>) 
message).getTopicName();
+                }
+                SortedMap<String, String> propertiesMap = new TreeMap<>();
+                int reconsumetimes = 1;
+                if (message.getProperties() != null) {
+                    propertiesMap.putAll(message.getProperties());
+                }
+
+                if 
(propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
+                    reconsumetimes = 
Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
+                    reconsumetimes = reconsumetimes + 1;
+                   
+                } else {
+                    
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, 
originTopicNameStr);
+                    
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, 
originMessageIdStr);
+                }
+
+                
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, 
String.valueOf(reconsumetimes));
+                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, 
String.valueOf(unit.toMillis(delayTime)));
+                
+               if (reconsumetimes > 
this.deadLetterPolicy.getMaxRedeliverCount()) {
+                   processPossibleToDLQ((MessageIdImpl)messageId);

Review comment:
       Does anyone know what is the point of this line? The logic of sending 
message to dead letter queue is described within more than twenty lines below 
as well as message acknowledgment, why is it done twice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to