Lanayx commented on a change in pull request #6449:
URL: https://github.com/apache/pulsar/pull/6449#discussion_r455394658
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -511,6 +527,129 @@ boolean markAckForBatchMessage(BatchMessageIdImpl
batchMessageId, AckType ackTyp
return sendAcknowledge(messageId, ackType, properties, txnImpl);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ protected CompletableFuture<Void> doReconsumeLater(Message<?> message,
AckType ackType,
+ Map<String,Long>
properties,
+ long delayTime,
+ TimeUnit unit) {
+ MessageId messageId = message.getMessageId();
+ if(messageId instanceof TopicMessageIdImpl) {
+ messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
+ }
+ checkArgument(messageId instanceof MessageIdImpl);
+ if (getState() != State.Ready && getState() != State.Connecting) {
+ stats.incrementNumAcksFailed();
+ PulsarClientException exception = new
PulsarClientException("Consumer not ready. State: " + getState());
+ if (AckType.Individual.equals(ackType)) {
+ onAcknowledge(messageId, exception);
+ } else if (AckType.Cumulative.equals(ackType)) {
+ onAcknowledgeCumulative(messageId, exception);
+ }
+ return FutureUtil.failedFuture(exception);
+ }
+ if (delayTime < 0) {
+ delayTime = 0;
+ }
+ if (retryLetterProducer == null) {
+ try {
+ createProducerLock.writeLock().lock();
+ if (retryLetterProducer == null) {
+ retryLetterProducer = client.newProducer(schema)
+ .topic(this.deadLetterPolicy.getRetryLetterTopic())
+ .enableBatching(false)
+ .blockIfQueueFull(false)
+ .create();
+ }
+ } catch (Exception e) {
+ log.error("Create retry letter producer exception with topic:
{}", deadLetterPolicy.getRetryLetterTopic(), e);
+ } finally {
+ createProducerLock.writeLock().unlock();
+ }
+ }
+ if (retryLetterProducer != null) {
+ try {
+ MessageImpl<T> retryMessage = null;
+ String originMessageIdStr = null;
+ String originTopicNameStr = null;
+ if (message instanceof TopicMessageImpl) {
+ retryMessage = (MessageImpl<T>) ((TopicMessageImpl<T>)
message).getMessage();
+ originMessageIdStr = ((TopicMessageIdImpl)
message.getMessageId()).getInnerMessageId().toString();
+ originTopicNameStr = ((TopicMessageIdImpl)
message.getMessageId()).getTopicName();
+ } else if (message instanceof MessageImpl) {
+ retryMessage = (MessageImpl<T>) message;
+ originMessageIdStr = ((MessageImpl<T>)
message).getMessageId().toString();
+ originTopicNameStr = ((MessageImpl<T>)
message).getTopicName();
+ }
+ SortedMap<String, String> propertiesMap = new TreeMap<>();
+ int reconsumetimes = 1;
+ if (message.getProperties() != null) {
+ propertiesMap.putAll(message.getProperties());
+ }
+
+ if
(propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
+ reconsumetimes =
Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
+ reconsumetimes = reconsumetimes + 1;
+
+ } else {
+
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC,
originTopicNameStr);
+
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID,
originMessageIdStr);
+ }
+
+
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES,
String.valueOf(reconsumetimes));
+ propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
String.valueOf(unit.toMillis(delayTime)));
+
+ if (reconsumetimes >
this.deadLetterPolicy.getMaxRedeliverCount()) {
+ processPossibleToDLQ((MessageIdImpl)messageId);
Review comment:
Does anyone know what is the point of this line? The logic of sending
message to dead letter consumer is described within more than twenty lines
below as well as message acknowledgment, why is it done twice?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]